package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.class */
public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
    private static final Logger LOG = LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
    private final long fullCompactionThresholdMs;
    private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
    private final NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
    private final ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
    private Long currentFirstWriteMs;
    private final NavigableMap<Long, Long> firstWriteMs;
    private final ListState<Tuple2<Long, Long>> firstWriteMsState;
    private transient TreeSet<Long> commitIdentifiersToCheck;

    public FullChangelogStoreSinkWrite(FileStoreTable fileStoreTable, StateInitializationContext stateInitializationContext, String str, IOManager iOManager, boolean z, long j) throws Exception {
        super(fileStoreTable, stateInitializationContext, str, iOManager, z);
        this.fullCompactionThresholdMs = j;
        this.currentWrittenBuckets = new HashSet();
        this.writtenBucketState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("table_store_written_buckets", new TupleSerializer(Tuple3.class, new TypeSerializer[]{LongSerializer.INSTANCE, new BinaryRowDataSerializer(fileStoreTable.schema().logicalPartitionType().getFieldCount()), IntSerializer.INSTANCE})));
        this.writtenBuckets = new TreeMap();
        ((Iterable) this.writtenBucketState.get()).forEach(tuple3 -> {
            ((Set) this.writtenBuckets.computeIfAbsent(tuple3.f0, l -> {
                return new HashSet();
            })).add(Tuple2.of(tuple3.f1, tuple3.f2));
        });
        this.currentFirstWriteMs = null;
        this.firstWriteMsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("first_write_ms", new TupleSerializer(Tuple2.class, new TypeSerializer[]{LongSerializer.INSTANCE, LongSerializer.INSTANCE})));
        this.firstWriteMs = new TreeMap();
        ((Iterable) this.firstWriteMsState.get()).forEach(tuple2 -> {
        });
        this.commitIdentifiersToCheck = new TreeSet<>();
    }

    @Override // org.apache.flink.table.store.connector.sink.StoreSinkWriteImpl, org.apache.flink.table.store.connector.sink.StoreSinkWrite
    public SinkRecord write(RowData rowData) throws Exception {
        SinkRecord write = super.write(rowData);
        touchBucket(write.partition(), write.bucket());
        return write;
    }

    @Override // org.apache.flink.table.store.connector.sink.StoreSinkWriteImpl, org.apache.flink.table.store.connector.sink.StoreSinkWrite
    public void compact(BinaryRowData binaryRowData, int i, boolean z) throws Exception {
        super.compact(binaryRowData, i, z);
        touchBucket(binaryRowData, i);
    }

    private void touchBucket(BinaryRowData binaryRowData, int i) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("touch partition {}, bucket {}", binaryRowData, Integer.valueOf(i));
        }
        if (!this.currentWrittenBuckets.contains(Tuple2.of(binaryRowData, Integer.valueOf(i)))) {
            this.currentWrittenBuckets.add(Tuple2.of(binaryRowData.copy(), Integer.valueOf(i)));
        }
        if (this.currentFirstWriteMs == null) {
            this.currentFirstWriteMs = Long.valueOf(System.currentTimeMillis());
        }
    }

    @Override // org.apache.flink.table.store.connector.sink.StoreSinkWriteImpl, org.apache.flink.table.store.connector.sink.StoreSinkWrite
    public List<Committable> prepareCommit(boolean z, long j) throws IOException {
        checkSuccessfulFullCompaction();
        if (!this.currentWrittenBuckets.isEmpty()) {
            ((Set) this.writtenBuckets.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            })).addAll(this.currentWrittenBuckets);
            this.currentWrittenBuckets.clear();
            this.firstWriteMs.putIfAbsent(Long.valueOf(j), this.currentFirstWriteMs);
            this.currentFirstWriteMs = null;
        }
        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry : this.writtenBuckets.entrySet()) {
                LOG.debug("Written buckets for checkpoint #{} are:", entry.getKey());
                for (Tuple2<BinaryRowData, Integer> tuple2 : entry.getValue()) {
                    LOG.debug("  * partition {}, bucket {}", tuple2.f0, tuple2.f1);
                }
            }
        }
        if (!this.writtenBuckets.isEmpty() && System.currentTimeMillis() - this.firstWriteMs.firstEntry().getValue().longValue() >= this.fullCompactionThresholdMs) {
            z = true;
        }
        if (z) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Submit full compaction for checkpoint #{}", Long.valueOf(j));
            }
            submitFullCompaction();
            this.commitIdentifiersToCheck.add(Long.valueOf(j));
        }
        return super.prepareCommit(z, j);
    }

    private void checkSuccessfulFullCompaction() {
        Long earliestSnapshotId;
        SnapshotManager snapshotManager = this.table.snapshotManager();
        Long latestSnapshotId = snapshotManager.latestSnapshotId();
        if (latestSnapshotId == null || (earliestSnapshotId = snapshotManager.earliestSnapshotId()) == null) {
            return;
        }
        long longValue = latestSnapshotId.longValue();
        while (true) {
            long j = longValue;
            if (j < earliestSnapshotId.longValue()) {
                return;
            }
            Snapshot snapshot = snapshotManager.snapshot(j);
            if (snapshot.commitUser().equals(this.commitUser) && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
                long commitIdentifier = snapshot.commitIdentifier();
                if (this.commitIdentifiersToCheck.contains(Long.valueOf(commitIdentifier))) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Found full compaction snapshot #{} with identifier {}", Long.valueOf(j), Long.valueOf(commitIdentifier));
                    }
                    this.writtenBuckets.headMap(Long.valueOf(commitIdentifier), true).clear();
                    this.firstWriteMs.headMap(Long.valueOf(commitIdentifier), true).clear();
                    this.commitIdentifiersToCheck.headSet(Long.valueOf(commitIdentifier)).clear();
                    return;
                }
            }
            longValue = j - 1;
        }
    }

    private void submitFullCompaction() {
        HashSet hashSet = new HashSet();
        this.writtenBuckets.forEach((l, set) -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                Tuple2 tuple2 = (Tuple2) it.next();
                if (!hashSet.contains(tuple2)) {
                    hashSet.add(tuple2);
                    try {
                        this.write.compact((BinaryRowData) tuple2.f0, ((Integer) tuple2.f1).intValue(), true);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
    }

    @Override // org.apache.flink.table.store.connector.sink.StoreSinkWriteImpl, org.apache.flink.table.store.connector.sink.StoreSinkWrite
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry : this.writtenBuckets.entrySet()) {
            for (Tuple2<BinaryRowData, Integer> tuple2 : entry.getValue()) {
                arrayList.add(Tuple3.of(entry.getKey(), tuple2.f0, tuple2.f1));
            }
        }
        this.writtenBucketState.update(arrayList);
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Long, Long> entry2 : this.firstWriteMs.entrySet()) {
            arrayList2.add(Tuple2.of(entry2.getKey(), entry2.getValue()));
        }
        this.firstWriteMsState.update(arrayList2);
    }
}
