package org.apache.flink.connector.file.table.stream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.connector.file.table.stream.PartitionCommitPredicate;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.StringUtils;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/ProcTimeCommitTrigger.class */
public class ProcTimeCommitTrigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>("pending-partitions-with-time", new MapSerializer(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions = new HashMap();
    private final ProcessingTimeService procTimeService;
    private final PartitionCommitPredicate partitionCommitPredicate;

    public ProcTimeCommitTrigger(boolean z, OperatorStateStore operatorStateStore, ProcessingTimeService processingTimeService, PartitionCommitPredicate partitionCommitPredicate) throws Exception {
        this.pendingPartitionsState = operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        if (z) {
            this.pendingPartitions.putAll((Map) this.pendingPartitionsState.get().iterator().next());
        }
        this.procTimeService = processingTimeService;
        this.partitionCommitPredicate = partitionCommitPredicate;
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public void addPartition(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            return;
        }
        this.pendingPartitions.putIfAbsent(str, Long.valueOf(this.procTimeService.getCurrentProcessingTime()));
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public List<String> committablePartitions(long j) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, Long>> it = this.pendingPartitions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Long> next = it.next();
            if (this.partitionCommitPredicate.isPartitionCommittable(createPredicateContext(next.getKey(), next.getValue().longValue()))) {
                arrayList.add(next.getKey());
                it.remove();
            }
        }
        return arrayList;
    }

    private PartitionCommitPredicate.PredicateContext createPredicateContext(final String str, final long j) {
        return new PartitionCommitPredicate.PredicateContext() { // from class: org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.1
            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public String partition() {
                return str;
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long createProcTime() {
                return j;
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long currentProcTime() {
                return ProcTimeCommitTrigger.this.procTimeService.getCurrentProcessingTime();
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long currentWatermark() {
                throw new UnsupportedOperationException("Method currentWatermark isn't supported in ProcTimeCommitTrigger.");
            }
        };
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public void snapshotState(long j, long j2) throws Exception {
        this.pendingPartitionsState.clear();
        this.pendingPartitionsState.add(new HashMap(this.pendingPartitions));
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public List<String> endInput() {
        ArrayList arrayList = new ArrayList(this.pendingPartitions.keySet());
        this.pendingPartitions.clear();
        return arrayList;
    }
}
