package org.apache.flink.table.runtime.operators.dynamicfiltering;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.class */
public class DynamicFilteringDataCollectorOperatorCoordinator implements OperatorCoordinator, CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
    private final CoordinatorStore coordinatorStore;
    private final List<String> dynamicFilteringDataListenerIDs;
    private DynamicFilteringData receivedFilteringData;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator$Provider.class */
    public static class Provider extends RecreateOnResetOperatorCoordinator.Provider {
        private final List<String> dynamicFilteringDataListenerIDs;

        public Provider(OperatorID operatorID, List<String> list) {
            super(operatorID);
            this.dynamicFilteringDataListenerIDs = (List) Preconditions.checkNotNull(list);
        }

        @Override // org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.Provider
        protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {
            return new DynamicFilteringDataCollectorOperatorCoordinator(context, this.dynamicFilteringDataListenerIDs);
        }
    }

    public DynamicFilteringDataCollectorOperatorCoordinator(OperatorCoordinator.Context context, List<String> list) {
        this.coordinatorStore = (CoordinatorStore) Preconditions.checkNotNull(context.getCoordinatorStore());
        this.dynamicFilteringDataListenerIDs = (List) Preconditions.checkNotNull(list);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
        DynamicFilteringData data = ((DynamicFilteringEvent) ((SourceEventWrapper) operatorEvent).getSourceEvent()).getData();
        if (this.receivedFilteringData != null) {
            if (!DynamicFilteringData.isEqual(this.receivedFilteringData, data)) {
                throw new IllegalStateException("DynamicFilteringData is recomputed but not equal. Triggering global failover in case the result is incorrect.  It's recommended to re-run the job with dynamic filtering disabled.");
            }
            return;
        }
        this.receivedFilteringData = data;
        for (String str : this.dynamicFilteringDataListenerIDs) {
            this.coordinatorStore.compute(str, (obj, obj2) -> {
                if (obj2 == null || (obj2 instanceof OperatorEvent)) {
                    LOG.info("Updating event {} before the source coordinator with ID {} is registered", operatorEvent, str);
                    return operatorEvent;
                }
                Preconditions.checkState(obj2 instanceof OperatorCoordinator, "The existing value for " + str + "is expected to be an operator coordinator, but it is in fact " + obj2);
                LOG.info("Distributing event {} to source coordinator with ID {}", operatorEvent, str);
                try {
                    ((OperatorCoordinator) obj2).handleEventFromOperator(0, 0, operatorEvent);
                    return null;
                } catch (Exception e) {
                    ExceptionUtils.rethrow(e);
                    return null;
                }
            });
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler
    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest coordinationRequest) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskReset(int i, long j) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
    }
}
