package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/NettyShuffleEnvironment.class */
public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleEnvironment.class);
    private final ResourceID taskExecutorResourceId;
    private final NettyShuffleEnvironmentConfiguration config;
    private final NetworkBufferPool networkBufferPool;
    private final ConnectionManager connectionManager;
    private final ResultPartitionManager resultPartitionManager;
    private final FileChannelManager fileChannelManager;
    private final ResultPartitionFactory resultPartitionFactory;
    private final SingleInputGateFactory singleInputGateFactory;
    private final Executor ioExecutor;
    private final BatchShuffleReadBufferPool batchShuffleReadBufferPool;
    private final ScheduledExecutorService batchShuffleReadIOExecutor;
    private final Object lock = new Object();
    private final Map<InputGateID, Set<SingleInputGate>> inputGatesById = new ConcurrentHashMap(10);
    private boolean isClosed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyShuffleEnvironment(ResourceID resourceID, NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, NetworkBufferPool networkBufferPool, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, FileChannelManager fileChannelManager, ResultPartitionFactory resultPartitionFactory, SingleInputGateFactory singleInputGateFactory, Executor executor, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService) {
        this.taskExecutorResourceId = resourceID;
        this.config = nettyShuffleEnvironmentConfiguration;
        this.networkBufferPool = networkBufferPool;
        this.connectionManager = connectionManager;
        this.resultPartitionManager = resultPartitionManager;
        this.fileChannelManager = fileChannelManager;
        this.resultPartitionFactory = resultPartitionFactory;
        this.singleInputGateFactory = singleInputGateFactory;
        this.ioExecutor = executor;
        this.batchShuffleReadBufferPool = batchShuffleReadBufferPool;
        this.batchShuffleReadIOExecutor = scheduledExecutorService;
    }

    @VisibleForTesting
    public ResultPartitionManager getResultPartitionManager() {
        return this.resultPartitionManager;
    }

    @VisibleForTesting
    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    @VisibleForTesting
    public NetworkBufferPool getNetworkBufferPool() {
        return this.networkBufferPool;
    }

    @VisibleForTesting
    public BatchShuffleReadBufferPool getBatchShuffleReadBufferPool() {
        return this.batchShuffleReadBufferPool;
    }

    @VisibleForTesting
    public ScheduledExecutorService getBatchShuffleReadIOExecutor() {
        return this.batchShuffleReadIOExecutor;
    }

    @VisibleForTesting
    public NettyShuffleEnvironmentConfiguration getConfiguration() {
        return this.config;
    }

    @VisibleForTesting
    public Optional<Collection<SingleInputGate>> getInputGate(InputGateID inputGateID) {
        return Optional.ofNullable(this.inputGatesById.get(inputGateID));
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public void releasePartitionsLocally(Collection<ResultPartitionID> collection) {
        this.ioExecutor.execute(() -> {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                this.resultPartitionManager.releasePartition((ResultPartitionID) it.next(), null);
            }
        });
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
        return this.resultPartitionManager.getUnreleasedPartitions();
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public ShuffleIOOwnerContext createShuffleIOOwnerContext(String str, ExecutionAttemptID executionAttemptID, MetricGroup metricGroup) {
        MetricGroup createShuffleIOOwnerMetricGroup = NettyShuffleMetricFactory.createShuffleIOOwnerMetricGroup((MetricGroup) Preconditions.checkNotNull(metricGroup));
        return new ShuffleIOOwnerContext((String) Preconditions.checkNotNull(str), (ExecutionAttemptID) Preconditions.checkNotNull(executionAttemptID), metricGroup, createShuffleIOOwnerMetricGroup.addGroup(NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT), createShuffleIOOwnerMetricGroup.addGroup(NettyShuffleMetricFactory.METRIC_GROUP_INPUT));
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public List<ResultPartition> createResultPartitionWriters(ShuffleIOOwnerContext shuffleIOOwnerContext, List<ResultPartitionDeploymentDescriptor> list) {
        List<ResultPartition> asList;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isClosed, "The NettyShuffleEnvironment has already been shut down.");
            ResultPartition[] resultPartitionArr = new ResultPartition[list.size()];
            for (int i = 0; i < resultPartitionArr.length; i++) {
                resultPartitionArr[i] = this.resultPartitionFactory.create(shuffleIOOwnerContext.getOwnerName(), i, list.get(i));
            }
            NettyShuffleMetricFactory.registerOutputMetrics(this.config.isNetworkDetailedMetrics(), shuffleIOOwnerContext.getOutputGroup(), resultPartitionArr);
            asList = Arrays.asList(resultPartitionArr);
        }
        return asList;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public List<SingleInputGate> createInputGates(ShuffleIOOwnerContext shuffleIOOwnerContext, PartitionProducerStateProvider partitionProducerStateProvider, List<InputGateDeploymentDescriptor> list) {
        List<SingleInputGate> asList;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isClosed, "The NettyShuffleEnvironment has already been shut down.");
            MetricGroup inputGroup = shuffleIOOwnerContext.getInputGroup();
            SingleInputGate[] singleInputGateArr = new SingleInputGate[list.size()];
            for (int i = 0; i < singleInputGateArr.length; i++) {
                InputGateDeploymentDescriptor inputGateDeploymentDescriptor = list.get(i);
                SingleInputGate create = this.singleInputGateFactory.create(shuffleIOOwnerContext, i, inputGateDeploymentDescriptor, partitionProducerStateProvider);
                InputGateID inputGateID = new InputGateID(inputGateDeploymentDescriptor.getConsumedResultId(), shuffleIOOwnerContext.getExecutionAttemptID());
                Set<SingleInputGate> computeIfAbsent = this.inputGatesById.computeIfAbsent(inputGateID, inputGateID2 -> {
                    return ConcurrentHashMap.newKeySet();
                });
                computeIfAbsent.add(create);
                this.inputGatesById.put(inputGateID, computeIfAbsent);
                create.getCloseFuture().thenRun(() -> {
                    this.inputGatesById.computeIfPresent(inputGateID, (inputGateID3, set) -> {
                        set.remove(create);
                        if (set.isEmpty()) {
                            return null;
                        }
                        return set;
                    });
                });
                singleInputGateArr[i] = create;
            }
            if (this.config.getDebloatConfiguration().isEnabled()) {
                NettyShuffleMetricFactory.registerDebloatingTaskMetrics(singleInputGateArr, shuffleIOOwnerContext.getParentGroup());
            }
            NettyShuffleMetricFactory.registerInputMetrics(this.config.isNetworkDetailedMetrics(), inputGroup, singleInputGateArr);
            asList = Arrays.asList(singleInputGateArr);
        }
        return asList;
    }

    @Deprecated
    public void registerLegacyNetworkMetrics(MetricGroup metricGroup, ResultPartitionWriter[] resultPartitionWriterArr, InputGate[] inputGateArr) {
        NettyShuffleMetricFactory.registerLegacyNetworkMetrics(this.config.isNetworkDetailedMetrics(), metricGroup, resultPartitionWriterArr, inputGateArr);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public boolean updatePartitionInfo(ExecutionAttemptID executionAttemptID, PartitionInfo partitionInfo) throws IOException, InterruptedException {
        Set<SingleInputGate> set = this.inputGatesById.get(new InputGateID(partitionInfo.getIntermediateDataSetID(), executionAttemptID));
        if (set == null || set.isEmpty()) {
            return false;
        }
        ShuffleDescriptor shuffleDescriptor = partitionInfo.getShuffleDescriptor();
        Preconditions.checkArgument(shuffleDescriptor instanceof NettyShuffleDescriptor, "Tried to update unknown channel with unknown ShuffleDescriptor %s.", shuffleDescriptor.getClass().getName());
        Iterator<SingleInputGate> it = set.iterator();
        while (it.hasNext()) {
            it.next().updateInputChannel(this.taskExecutorResourceId, (NettyShuffleDescriptor) shuffleDescriptor);
        }
        return true;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleEnvironment
    public int start() throws IOException {
        int start;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isClosed, "The NettyShuffleEnvironment has already been shut down.");
            LOG.info("Starting the network environment and its components.");
            try {
                LOG.debug("Starting network connection manager");
                start = this.connectionManager.start();
            } catch (IOException e) {
                throw new IOException("Failed to instantiate network connection manager.", e);
            }
        }
        return start;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            if (this.isClosed) {
                return;
            }
            LOG.info("Shutting down the network environment and its components.");
            try {
                LOG.debug("Shutting down network connection manager");
                this.connectionManager.shutdown();
            } catch (Throwable th) {
                LOG.warn("Cannot shut down the network connection manager.", th);
            }
            try {
                LOG.debug("Shutting down intermediate result partition manager");
                this.resultPartitionManager.shutdown();
            } catch (Throwable th2) {
                LOG.warn("Cannot shut down the result partition manager.", th2);
            }
            try {
                this.networkBufferPool.destroyAllBufferPools();
            } catch (Throwable th3) {
                LOG.warn("Could not destroy all buffer pools.", th3);
            }
            try {
                this.networkBufferPool.destroy();
            } catch (Throwable th4) {
                LOG.warn("Network buffer pool did not shut down properly.", th4);
            }
            try {
                this.fileChannelManager.close();
            } catch (Throwable th5) {
                LOG.warn("Cannot close the file channel manager properly.", th5);
            }
            try {
                this.batchShuffleReadBufferPool.destroy();
            } catch (Throwable th6) {
                LOG.warn("Cannot shut down batch shuffle read buffer pool properly.", th6);
            }
            try {
                this.batchShuffleReadIOExecutor.shutdown();
            } catch (Throwable th7) {
                LOG.warn("Cannot shut down batch shuffle read IO executor properly.", th7);
            }
            this.isClosed = true;
        }
    }

    public boolean isClosed() {
        boolean z;
        synchronized (this.lock) {
            z = this.isClosed;
        }
        return z;
    }
}
