package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.ZoneId;
import java.util.TimeZone;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.class */
public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
    private final RowDataKeySelector keySelector;
    private final SliceAssigner sliceAssigner;
    private final long windowInterval;
    private final WindowBuffer.LocalFactory windowBufferFactory;
    private final ZoneId shiftTimezone;
    private final boolean useDayLightSaving;
    protected transient TimestampedCollector<RowData> collector;
    private transient long currentWatermark;
    private transient long nextTriggerWatermark;
    private transient WindowBuffer windowBuffer;

    public LocalSlicingWindowAggOperator(RowDataKeySelector rowDataKeySelector, SliceAssigner sliceAssigner, WindowBuffer.LocalFactory localFactory, ZoneId zoneId) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.keySelector = rowDataKeySelector;
        this.sliceAssigner = sliceAssigner;
        this.windowInterval = sliceAssigner.getSliceEndInterval();
        this.windowBufferFactory = localFactory;
        this.shiftTimezone = zoneId;
        this.useDayLightSaving = TimeZone.getTimeZone(zoneId).useDaylightTime();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.windowBuffer = this.windowBufferFactory.create(getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), getRuntimeContext(), this.collector, this.shiftTimezone);
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData value = streamRecord.getValue();
        this.windowBuffer.addElement(this.keySelector.getKey(value), this.sliceAssigner.assignSliceEnd(value, CLOCK_SERVICE), value);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = watermark.getTimestamp();
            if (this.currentWatermark >= this.nextTriggerWatermark) {
                this.windowBuffer.advanceProgress(this.currentWatermark);
                this.nextTriggerWatermark = TimeWindowUtil.getNextTriggerWatermark(this.currentWatermark, this.windowInterval, this.shiftTimezone, this.useDayLightSaving);
            }
        }
        super.processWatermark(watermark);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    private long computeMemorySize() {
        Environment environment = getContainingTask().getEnvironment();
        return environment.getMemoryManager().computeMemorySize(getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader()));
    }
}
