package org.apache.flink.table.runtime.operators.bundle;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.context.ExecutionContextImpl;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.class */
public abstract class AbstractMapBundleOperator<K, V, IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
    private static final long serialVersionUID = 5081841938324118594L;
    private transient Map<K, V> bundle;
    private final BundleTrigger<IN> bundleTrigger;
    private final MapBundleFunction<K, V, IN, OUT> function;
    private transient Collector<OUT> collector;
    private transient int numOfElements = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractMapBundleOperator(MapBundleFunction<K, V, IN, OUT> mapBundleFunction, BundleTrigger<IN> bundleTrigger) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.function = (MapBundleFunction) Preconditions.checkNotNull(mapBundleFunction, "function is null");
        this.bundleTrigger = (BundleTrigger) Preconditions.checkNotNull(bundleTrigger, "bundleTrigger is null");
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.function.open(new ExecutionContextImpl(this, getRuntimeContext()));
        this.numOfElements = 0;
        this.collector = new StreamRecordCollector(this.output);
        this.bundle = new HashMap();
        this.bundleTrigger.registerCallback(this);
        this.bundleTrigger.reset();
        LOG.info("BundleOperator's trigger info: " + this.bundleTrigger.explain());
        getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> {
            return Integer.valueOf(this.numOfElements);
        });
        getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int size = this.bundle.size();
            return size == 0 ? Double.valueOf(0.0d) : Double.valueOf((1.0d * this.numOfElements) / size);
        });
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        IN value = streamRecord.getValue();
        K key = getKey(value);
        this.bundle.put(key, this.function.addInput(this.bundle.get(key), value));
        this.numOfElements++;
        this.bundleTrigger.onElement(value);
    }

    protected abstract K getKey(IN in) throws Exception;

    @Override // org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback
    public void finishBundle() throws Exception {
        if (this.bundle != null && !this.bundle.isEmpty()) {
            this.numOfElements = 0;
            this.function.finishBundle(this.bundle, this.collector);
            this.bundle.clear();
        }
        this.bundleTrigger.reset();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark(watermark);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        finishBundle();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
        finishBundle();
        super.finish();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        Exception exc = null;
        try {
            super.close();
            if (this.function != null) {
                FunctionUtils.closeFunction(this.function);
            }
        } catch (InterruptedException e) {
            exc = e;
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            exc = e2;
        }
        if (exc != null) {
            LOG.warn("Errors occurred while closing the BundleOperator.", exc);
        }
    }
}
