package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.class */
public class SpeculativeScheduler extends AdaptiveBatchScheduler implements SlowTaskDetectorListener {
    private final int maxConcurrentExecutions;
    private final Duration blockSlowNodeDuration;
    private final BlocklistOperations blocklistOperations;
    private final SlowTaskDetector slowTaskDetector;
    private long numSlowExecutionVertices;
    private final Counter numEffectiveSpeculativeExecutionsCounter;

    public SpeculativeScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismDecider vertexParallelismDecider, int i, BlocklistOperations blocklistOperations) throws Exception {
        super(logger, jobGraph, executor, configuration, consumer, scheduledExecutor, classLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, factory, restartBackoffTimeStrategy, executionOperations, executionVertexVersioner, executionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory, shuffleMaster, time, vertexParallelismDecider, i);
        this.maxConcurrentExecutions = configuration.getInteger(JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
        this.blockSlowNodeDuration = (Duration) configuration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
        Preconditions.checkArgument(!this.blockSlowNodeDuration.isNegative(), "The blocking duration should not be negative.");
        this.blocklistOperations = (BlocklistOperations) Preconditions.checkNotNull(blocklistOperations);
        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(configuration);
        this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler, org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        registerMetrics(this.jobManagerJobMetricGroup);
        super.startSchedulingInternal();
        this.slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
    }

    private void registerMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(MetricNames.NUM_SLOW_EXECUTION_VERTICES, () -> {
            return Long.valueOf(this.numSlowExecutionVertices);
        });
        metricGroup.counter(MetricNames.NUM_EFFECTIVE_SPECULATIVE_EXECUTIONS, this.numEffectiveSpeculativeExecutionsCounter);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase, org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        this.slowTaskDetector.stop();
        return super.closeAsync();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexID) {
        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler, org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFinished(Execution execution) {
        if (!isOriginalAttempt(execution)) {
            this.numEffectiveSpeculativeExecutionsCounter.inc();
        }
        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
        super.onTaskFinished(execution);
    }

    private static boolean isOriginalAttempt(Execution execution) {
        return ((SpeculativeExecutionVertex) execution.getVertex()).isOriginalAttempt(execution.getAttemptNumber());
    }

    private CompletableFuture<?> cancelPendingExecutions(ExecutionVertexID executionVertexID) {
        List list = (List) getExecutionVertex(executionVertexID).getCurrentExecutions().stream().filter(execution -> {
            return (execution.getState().isTerminal() || execution.getState() == ExecutionState.CANCELING) ? false : true;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.log.info("Canceling {} un-finished executions of {} because one of its executions has finished.", Integer.valueOf(list.size()), executionVertexID);
        FutureUtils.ConjunctFuture combineAll = FutureUtils.combineAll((Collection) list.stream().map(this::cancelExecution).collect(Collectors.toList()));
        cancelAllPendingSlotRequestsForVertex(executionVertexID);
        return combineAll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFailed(Execution execution) {
        getExecutionVertex(execution.getVertex().getID()).archiveFailedExecution(execution.getAttemptId());
        super.onTaskFailed(execution);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler
    public void handleTaskFailure(Execution execution, @Nullable Throwable th) {
        if (!isExecutionVertexPossibleToFinish(getExecutionVertex(execution.getVertex().getID())) || ExceptionUtils.findThrowable(th, PartitionException.class).isPresent()) {
            super.handleTaskFailure(execution, th);
        } else {
            handleLocalExecutionAttemptFailure(execution, th);
        }
    }

    private void handleLocalExecutionAttemptFailure(Execution execution, @Nullable Throwable th) {
        this.executionSlotAllocator.cancel(execution.getAttemptId());
        FailureHandlingResult recordTaskFailure = recordTaskFailure(execution, th);
        if (recordTaskFailure.canRestart()) {
            archiveFromFailureHandlingResult(createFailureHandlingResultSnapshot(recordTaskFailure));
        } else {
            failJob(th, recordTaskFailure.getTimestamp());
        }
    }

    private static boolean isExecutionVertexPossibleToFinish(SpeculativeExecutionVertex speculativeExecutionVertex) {
        boolean z = false;
        for (Execution execution : speculativeExecutionVertex.getCurrentExecutions()) {
            Preconditions.checkState(execution.getState() != ExecutionState.FINISHED);
            if (execution.getState() == ExecutionState.CREATED || execution.getState() == ExecutionState.SCHEDULED || execution.getState() == ExecutionState.DEPLOYING || execution.getState() == ExecutionState.INITIALIZING || execution.getState() == ExecutionState.RUNNING) {
                z = true;
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void resetForNewExecution(ExecutionVertexID executionVertexID) {
        Execution currentExecutionAttempt = getExecutionVertex(executionVertexID).getCurrentExecutionAttempt();
        if (currentExecutionAttempt.getState() == ExecutionState.FINISHED && !isOriginalAttempt(currentExecutionAttempt)) {
            this.numEffectiveSpeculativeExecutionsCounter.dec();
        }
        super.resetForNewExecution(executionVertexID);
    }

    @Override // org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener
    public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> map) {
        int size;
        long currentTimeMillis = System.currentTimeMillis();
        this.numSlowExecutionVertices = map.size();
        blockSlowNodes(map, currentTimeMillis);
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (ExecutionVertexID executionVertexID : map.keySet()) {
            SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexID);
            if (!executionVertex.containsSinks() && (size = this.maxConcurrentExecutions - executionVertex.getCurrentExecutions().size()) > 0) {
                this.log.info("{} ({}) is detected as a slow vertex, create and deploy {} new speculative executions for it.", new Object[]{executionVertex.getTaskNameWithSubtaskIndex(), executionVertex.getID(), Integer.valueOf(size)});
                Collection<Execution> collection = (Collection) IntStream.range(0, size).mapToObj(i -> {
                    return executionVertex.createNewSpeculativeExecution(currentTimeMillis);
                }).collect(Collectors.toList());
                setupSubtaskGatewayForAttempts(executionVertex, collection);
                hashSet.add(executionVertexID);
                arrayList.addAll(collection);
            }
        }
        this.executionDeployer.allocateSlotsAndDeploy(arrayList, this.executionVertexVersioner.getExecutionVertexVersions(hashSet));
    }

    private void blockSlowNodes(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> map, long j) {
        if (this.blockSlowNodeDuration.isZero()) {
            return;
        }
        long millis = j + this.blockSlowNodeDuration.toMillis();
        this.blocklistOperations.addNewBlockedNodes((Collection) getSlowNodeIds(map).stream().map(str -> {
            return new BlockedNode(str, "Node is detected to be slow.", millis);
        }).collect(Collectors.toList()));
    }

    private Set<String> getSlowNodeIds(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> map) {
        return (Set) ((Set) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet())).stream().map(executionAttemptID -> {
            return getExecutionGraph().getRegisteredExecutions().get(executionAttemptID);
        }).map(execution -> {
            Preconditions.checkNotNull(execution.getAssignedResource(), "The reported slow node have not been assigned a slot. This is unexpected and indicates that there is something wrong with the slow task detector.");
            return execution.getAssignedResourceLocation();
        }).map((v0) -> {
            return v0.getNodeId();
        }).collect(Collectors.toSet());
    }

    private void setupSubtaskGatewayForAttempts(SpeculativeExecutionVertex speculativeExecutionVertex, Collection<Execution> collection) {
        Set set = (Set) collection.stream().map((v0) -> {
            return v0.getAttemptNumber();
        }).collect(Collectors.toSet());
        speculativeExecutionVertex.getJobVertex().getOperatorCoordinators().forEach(operatorCoordinatorHolder -> {
            operatorCoordinatorHolder.setupSubtaskGatewayForAttempts(speculativeExecutionVertex.getParallelSubtaskIndex(), set);
        });
    }

    @VisibleForTesting
    long getNumSlowExecutionVertices() {
        return this.numSlowExecutionVertices;
    }

    @VisibleForTesting
    long getNumEffectiveSpeculativeExecutions() {
        return this.numEffectiveSpeculativeExecutionsCounter.getCount();
    }
}
