/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniCluster
implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final Object lock = new Object();
    private final MiniClusterConfiguration miniClusterConfiguration;
    private final Time rpcTimeout;
    @GuardedBy(value="lock")
    private final List<TaskExecutor> taskManagers;
    private final TerminatingFatalErrorHandlerFactory taskManagerTerminatingFatalErrorHandlerFactory = new TerminatingFatalErrorHandlerFactory();
    private final Supplier<Reference<RpcSystem>> rpcSystemSupplier;
    private CompletableFuture<Void> terminationFuture;
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry;
    @GuardedBy(value="lock")
    private ProcessMetricGroup processMetricGroup;
    @GuardedBy(value="lock")
    private RpcService commonRpcService;
    @GuardedBy(value="lock")
    private ExecutorService ioExecutor;
    @GuardedBy(value="lock")
    private final Collection<RpcService> rpcServices;
    @GuardedBy(value="lock")
    private HighAvailabilityServicesFactory haServicesFactory;
    @GuardedBy(value="lock")
    private HighAvailabilityServices haServices;
    @GuardedBy(value="lock")
    private BlobServer blobServer;
    @GuardedBy(value="lock")
    private HeartbeatServices heartbeatServices;
    @GuardedBy(value="lock")
    private BlobCacheService blobCacheService;
    @GuardedBy(value="lock")
    private LeaderRetrievalService resourceManagerLeaderRetriever;
    @GuardedBy(value="lock")
    private LeaderRetrievalService dispatcherLeaderRetriever;
    @GuardedBy(value="lock")
    private LeaderRetrievalService clusterRestEndpointLeaderRetrievalService;
    @GuardedBy(value="lock")
    private Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents;
    @GuardedBy(value="lock")
    private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;
    @GuardedBy(value="lock")
    private RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever;
    @GuardedBy(value="lock")
    private LeaderRetriever webMonitorLeaderRetriever;
    @GuardedBy(value="lock")
    private RpcServiceFactory taskManagerRpcServiceFactory;
    @GuardedBy(value="lock")
    private WorkingDirectory workingDirectory;
    private volatile boolean running;
    @GuardedBy(value="lock")
    private Reference<RpcSystem> rpcSystem;
    @Internal
    private boolean overrideRestoreModeForChangelogStateBackend;

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this(miniClusterConfiguration, () -> Reference.owned(RpcSystem.load((Configuration)miniClusterConfiguration.getConfiguration())));
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration, Supplier<Reference<RpcSystem>> rpcSystemSupplier) {
        this.miniClusterConfiguration = Preconditions.checkNotNull(miniClusterConfiguration, "config may not be null");
        this.rpcServices = new ArrayList<RpcService>(3 + miniClusterConfiguration.getNumTaskManagers());
        this.dispatcherResourceManagerComponents = new ArrayList<DispatcherResourceManagerComponent>(1);
        this.rpcTimeout = RpcUtils.INF_TIMEOUT;
        this.terminationFuture = CompletableFuture.completedFuture(null);
        this.running = false;
        this.taskManagers = new ArrayList<TaskExecutor>(miniClusterConfiguration.getNumTaskManagers());
        this.rpcSystemSupplier = rpcSystemSupplier;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<URI> getRestAddress() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            return this.webMonitorLeaderRetriever.getLeaderFuture().thenApply(FunctionUtils.uncheckedFunction(addressLeaderIdTuple -> new URI((String)addressLeaderIdTuple.f0)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ClusterInformation getClusterInformation() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            return new ClusterInformation("localhost", this.blobServer.getPort());
        }
    }

    protected Executor getIOExecutor() {
        return this.ioExecutor;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.running, "MiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", (Object)this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            boolean useSingleRpcService = this.miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
            try {
                RpcService metricQueryServiceRpcService;
                RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
                this.workingDirectory = WorkingDirectory.create(ClusterEntrypointUtils.generateWorkingDirectoryFile(configuration, Optional.empty(), "minicluster_" + ResourceID.generate()));
                this.initializeIOFormatClasses(configuration);
                this.rpcSystem = this.rpcSystemSupplier.get();
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = this.createMetricRegistry(configuration, this.rpcSystem.deref().getMaximumMessageSizeInBytes((Configuration)configuration));
                LOG.info("Starting RPC Service(s)");
                if (useSingleRpcService) {
                    this.commonRpcService = this.createLocalRpcService(configuration, this.rpcSystem.deref());
                    CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(this.commonRpcService);
                    this.taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
                    metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration, this.rpcSystem.deref());
                } else {
                    String jobManagerExternalAddress = this.miniClusterConfiguration.getJobManagerExternalAddress();
                    String taskManagerExternalAddress = this.miniClusterConfiguration.getTaskManagerExternalAddress();
                    String jobManagerExternalPortRange = this.miniClusterConfiguration.getJobManagerExternalPortRange();
                    String taskManagerExternalPortRange = this.miniClusterConfiguration.getTaskManagerExternalPortRange();
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    dispatcherResourceManagerComponentRpcServiceFactory = new DedicatedRpcServiceFactory(configuration, jobManagerExternalAddress, jobManagerExternalPortRange, jobManagerBindAddress, this.rpcSystem.deref());
                    this.taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(configuration, taskManagerExternalAddress, taskManagerExternalPortRange, taskManagerBindAddress, this.rpcSystem.deref());
                    this.commonRpcService = this.createRemoteRpcService(configuration, jobManagerBindAddress, 0, this.rpcSystem.deref());
                    metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, this.commonRpcService.getAddress(), null, this.rpcSystem.deref());
                }
                this.metricRegistry.startQueryService(metricQueryServiceRpcService, null);
                this.processMetricGroup = MetricUtils.instantiateProcessMetricGroup(this.metricRegistry, RpcUtils.getHostname((RpcService)this.commonRpcService), ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
                this.ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("mini-cluster-io"));
                this.haServicesFactory = this.createHighAvailabilityServicesFactory(configuration);
                this.haServices = this.createHighAvailabilityServices(configuration, this.ioExecutor);
                this.blobServer = BlobUtils.createBlobServer(configuration, Reference.borrowed(this.workingDirectory.getBlobStorageDirectory()), this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                this.blobCacheService = BlobUtils.createBlobCacheService(configuration, Reference.borrowed(this.workingDirectory.getBlobStorageDirectory()), this.haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), this.blobServer.getPort()));
                this.startTaskManagers();
                RpcMetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(this.metricRegistry.getMetricQueryServiceRpcService());
                this.setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);
                this.resourceManagerLeaderRetriever = this.haServices.getResourceManagerLeaderRetriever();
                this.dispatcherLeaderRetriever = this.haServices.getDispatcherLeaderRetriever();
                this.clusterRestEndpointLeaderRetrievalService = this.haServices.getClusterRestEndpointLeaderRetriever();
                this.dispatcherGatewayRetriever = new RpcGatewayRetriever<DispatcherId, DispatcherGateway>(this.commonRpcService, DispatcherGateway.class, DispatcherId::fromUuid, new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
                this.resourceManagerGatewayRetriever = new RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway>(this.commonRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
                this.webMonitorLeaderRetriever = new LeaderRetriever();
                this.resourceManagerLeaderRetriever.start(this.resourceManagerGatewayRetriever);
                this.dispatcherLeaderRetriever.start(this.dispatcherGatewayRetriever);
                this.clusterRestEndpointLeaderRetrievalService.start(this.webMonitorLeaderRetriever);
            }
            catch (Exception e) {
                try {
                    this.close();
                }
                catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }
            this.terminationFuture = new CompletableFuture();
            this.running = true;
            LOG.info("Flink Mini Cluster started successfully");
        }
    }

    @GuardedBy(value="lock")
    private void setupDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory, MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
        this.dispatcherResourceManagerComponents.addAll(this.createDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, this.blobServer, this.heartbeatServices, this.metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler()));
        FutureUtils.completeAll(this.dispatcherResourceManagerComponents.stream().map(DispatcherResourceManagerComponent::getShutDownFuture).collect(Collectors.toList())).whenComplete((ignored, exception) -> this.closeAsync());
    }

    @VisibleForTesting
    protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory rpcServiceFactory, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception {
        DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = this.createDispatcherResourceManagerComponentFactory();
        DispatcherResourceManagerComponent dispatcherResourceManagerComponent = dispatcherResourceManagerComponentFactory.create(configuration, ResourceID.generate(), this.ioExecutor, rpcServiceFactory.createRpcService(), this.haServices, blobServer, heartbeatServices, metricRegistry, new MemoryExecutionGraphInfoStore(), metricQueryServiceRetriever, fatalErrorHandler);
        FutureUtils.assertNoException(dispatcherResourceManagerComponent.getShutDownFuture().thenCompose(applicationStatus -> dispatcherResourceManagerComponent.stopApplication((ApplicationStatus)((Object)applicationStatus), null)));
        return Collections.singleton(dispatcherResourceManagerComponent);
    }

    protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
    }

    private HighAvailabilityServicesFactory createHighAvailabilityServicesFactory(Configuration configuration) {
        HaServices customMiniClusterHaServicesMode = this.miniClusterConfiguration.getHaServices();
        if (customMiniClusterHaServicesMode == HaServices.WITH_LEADERSHIP_CONTROL) {
            return new SingletonHighAvailabilityServicesFactory((config, embeddedLeaderElectionExecutor) -> new EmbeddedHaServicesWithLeadershipControl((Executor)embeddedLeaderElectionExecutor));
        }
        if (customMiniClusterHaServicesMode != HaServices.CONFIGURED) {
            throw new IllegalConfigurationException("Unknown HA Services Mode configured in MiniCluster configuration: " + (Object)((Object)customMiniClusterHaServicesMode));
        }
        HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
        if (highAvailabilityMode == HighAvailabilityMode.NONE) {
            return new SingletonHighAvailabilityServicesFactory((config, embeddedLeaderElectionExecutor) -> new EmbeddedHaServices((Executor)embeddedLeaderElectionExecutor));
        }
        return new RegularHighAvailabilityServicesFactory();
    }

    @VisibleForTesting
    protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
        return this.haServicesFactory.createHAServices(configuration, executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<HaLeadershipControl> getHaLeadershipControl() {
        Object object = this.lock;
        synchronized (object) {
            return this.haServices instanceof HaLeadershipControl ? Optional.of((HaLeadershipControl)((Object)this.haServices)) : Optional.empty();
        }
    }

    protected HighAvailabilityServices getHaServices() {
        return this.haServices;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    long shutdownTimeoutMillis = this.miniClusterConfiguration.getConfiguration().getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
                    int numComponents = 2 + this.miniClusterConfiguration.getNumTaskManagers();
                    ArrayList<CompletableFuture<Void>> componentTerminationFutures = new ArrayList<CompletableFuture<Void>>(numComponents);
                    componentTerminationFutures.addAll(this.terminateTaskManagers());
                    componentTerminationFutures.add(this.shutDownResourceManagerComponents());
                    FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures);
                    CompletableFuture<Void> metricSystemTerminationFuture = FutureUtils.composeAfterwards(componentsTerminationFuture, this::closeMetricSystem);
                    CompletableFuture<Void> rpcServicesTerminationFuture = FutureUtils.composeAfterwards(metricSystemTerminationFuture, this::terminateRpcServices);
                    CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards(rpcServicesTerminationFuture, this::terminateMiniClusterServices);
                    CompletableFuture<Void> executorsTerminationFuture = FutureUtils.composeAfterwards(remainingServicesTerminationFuture, () -> this.terminateExecutors(shutdownTimeoutMillis));
                    CompletableFuture<Void> deleteDirectoriesFuture = FutureUtils.runAfterwards(executorsTerminationFuture, this::deleteDirectories);
                    deleteDirectoriesFuture.whenComplete((ignored, throwable) -> {
                        if (throwable != null) {
                            this.terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(throwable));
                        } else {
                            this.terminationFuture.complete(null);
                        }
                    });
                }
                finally {
                    this.running = false;
                }
            }
            return this.terminationFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> closeMetricSystem() {
        Object object = this.lock;
        synchronized (object) {
            ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<CompletableFuture<Void>>(2);
            if (this.processMetricGroup != null) {
                this.processMetricGroup.close();
                this.processMetricGroup = null;
            }
            if (this.metricRegistry != null) {
                terminationFutures.add(this.metricRegistry.shutdown());
                this.metricRegistry = null;
            }
            return FutureUtils.completeAll(terminationFutures);
        }
    }

    @GuardedBy(value="lock")
    private void startTaskManagers() throws Exception {
        int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
        LOG.info("Starting {} TaskManager(s)", (Object)numTaskManagers);
        for (int i = 0; i < numTaskManagers; ++i) {
            this.startTaskManager();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startTaskManager() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(configuration, new ResourceID(UUID.randomUUID().toString()), this.taskManagerRpcServiceFactory.createRpcService(), this.haServices, this.heartbeatServices, this.metricRegistry, this.blobCacheService, this.useLocalCommunication(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, this.workingDirectory.createSubWorkingDirectory("tm_" + this.taskManagers.size()), this.taskManagerTerminatingFatalErrorHandlerFactory.create(this.taskManagers.size()));
            taskExecutor.start();
            this.taskManagers.add(taskExecutor);
        }
    }

    @VisibleForTesting
    protected boolean useLocalCommunication() {
        return this.miniClusterConfiguration.getNumTaskManagers() == 1;
    }

    @VisibleForTesting
    public Configuration getConfiguration() {
        return this.miniClusterConfiguration.getConfiguration();
    }

    @Internal
    public void overrideRestoreModeForChangelogStateBackend() {
        this.overrideRestoreModeForChangelogStateBackend = true;
    }

    @GuardedBy(value="lock")
    private Collection<? extends CompletableFuture<Void>> terminateTaskManagers() {
        ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<CompletableFuture<Void>>(this.taskManagers.size());
        for (int i = 0; i < this.taskManagers.size(); ++i) {
            terminationFutures.add(this.terminateTaskManager(i));
        }
        return terminationFutures;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> terminateTaskManager(int index) {
        Object object = this.lock;
        synchronized (object) {
            TaskExecutor taskExecutor = this.taskManagers.get(index);
            return taskExecutor.closeAsync();
        }
    }

    public CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraph(JobID jobId) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestExecutionGraphInfo(jobId, this.rpcTimeout).thenApply(ExecutionGraphInfo::getArchivedExecutionGraph));
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestMultipleJobDetails(this.rpcTimeout).thenApply(jobs -> jobs.getJobs().stream().map(details -> new JobStatusMessage(details.getJobId(), details.getJobName(), details.getStatus(), details.getStartTime())).collect(Collectors.toList())));
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobStatus(jobId, this.rpcTimeout));
    }

    public CompletableFuture<Acknowledge> cancelJob(JobID jobId) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.cancelJob(jobId, this.rpcTimeout));
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob, SavepointFormatType formatType) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.triggerSavepointAndGetLocation(jobId, targetDirectory, formatType, cancelJob ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT : TriggerSavepointMode.SAVEPOINT, this.rpcTimeout));
    }

    public CompletableFuture<String> triggerCheckpoint(JobID jobID) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.triggerCheckpoint(jobID, this.rpcTimeout));
    }

    public CompletableFuture<String> stopWithSavepoint(JobID jobId, String targetDirectory, boolean terminate, SavepointFormatType formatType) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.stopWithSavepointAndGetLocation(jobId, targetDirectory, formatType, terminate ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, this.rpcTimeout));
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.disposeSavepoint(savepointPath, this.rpcTimeout));
    }

    public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobId) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJob(jobId, this.rpcTimeout));
    }

    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobId, OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, this.rpcTimeout));
    }

    public CompletableFuture<ResourceOverview> getResourceOverview() {
        return this.runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(this.rpcTimeout));
    }

    private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) {
        return ((CompletableFuture)this.getDispatcherGatewayFuture().thenApply(dispatcherCommand)).thenCompose(Function.identity());
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
        return ((CompletableFuture)this.getResourceManagerGatewayFuture().thenApply(resourceManagerCommand)).thenCompose(Function.identity());
    }

    public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(job, "job is null");
        CompletableFuture<JobSubmissionResult> submissionFuture = this.submitJob(job);
        try {
            submissionFuture.get();
        }
        catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), ExceptionUtils.stripExecutionException(e));
        }
    }

    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        JobResult jobResult;
        Preconditions.checkNotNull(job, "job is null");
        CompletableFuture<JobSubmissionResult> submissionFuture = this.submitJob(job);
        CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> this.requestJobResult(job.getJobID()));
        try {
            jobResult = (JobResult)((CompletableFuture)jobResultFuture).get();
        }
        catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }
        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        }
        catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), (Throwable)e);
        }
    }

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        JobGraph clonedJobGraph = MiniCluster.cloneJobGraph(jobGraph);
        this.checkRestoreModeForChangelogStateBackend(clonedJobGraph);
        CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = this.getDispatcherGatewayFuture();
        CompletableFuture<InetSocketAddress> blobServerAddressFuture = this.createBlobServerAddress(dispatcherGatewayFuture);
        CompletableFuture<Void> jarUploadFuture = this.uploadAndSetJobFiles(blobServerAddressFuture, clonedJobGraph);
        CompletionStage acknowledgeCompletableFuture = ((CompletableFuture)jarUploadFuture.thenCombine(dispatcherGatewayFuture, (ack, dispatcherGateway) -> dispatcherGateway.submitJob(clonedJobGraph, this.rpcTimeout))).thenCompose(Function.identity());
        return ((CompletableFuture)acknowledgeCompletableFuture).thenApply(ignored -> new JobSubmissionResult(clonedJobGraph.getJobID()));
    }

    private void checkRestoreModeForChangelogStateBackend(JobGraph jobGraph) {
        SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
        if (this.overrideRestoreModeForChangelogStateBackend && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) {
            Configuration conf = new Configuration();
            SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf);
            conf.set((ConfigOption)SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY);
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
        }
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT));
    }

    public CompletableFuture<ClusterOverview> requestClusterOverview() {
        return this.runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestClusterOverview(RpcUtils.INF_TIMEOUT));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            return this.dispatcherGatewayRetriever.getFuture();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<ResourceManagerGateway> getResourceManagerGatewayFuture() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            return this.resourceManagerGatewayRetriever.getFuture();
        }
    }

    private CompletableFuture<Void> uploadAndSetJobFiles(CompletableFuture<InetSocketAddress> blobServerAddressFuture, JobGraph job) {
        return blobServerAddressFuture.thenAccept(blobServerAddress -> {
            try {
                ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient((InetSocketAddress)blobServerAddress, this.miniClusterConfiguration.getConfiguration()));
            }
            catch (FlinkException e) {
                throw new CompletionException(e);
            }
        });
    }

    private CompletableFuture<InetSocketAddress> createBlobServerAddress(CompletableFuture<DispatcherGateway> dispatcherGatewayFuture) {
        return ((CompletableFuture)dispatcherGatewayFuture.thenApply(dispatcherGateway -> dispatcherGateway.getBlobServerPort(this.rpcTimeout).thenApply(blobServerPort -> new InetSocketAddress(dispatcherGateway.getHostname(), (int)blobServerPort)))).thenCompose(Function.identity());
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration config, long maximumMessageSizeInBytes) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config, maximumMessageSizeInBytes), ReporterSetup.fromConfiguration(config, null));
    }

    protected RpcService createRemoteRpcService(Configuration configuration, String bindAddress, int bindPort, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.remoteServiceBuilder(configuration, bindAddress, String.valueOf(bindPort)).withBindAddress(bindAddress).withBindPort(bindPort).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    protected RpcService createRemoteRpcService(Configuration configuration, String externalAddress, String externalPortRange, String bindAddress, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.remoteServiceBuilder(configuration, externalAddress, externalPortRange).withBindAddress(bindAddress).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    protected RpcService createLocalRpcService(Configuration configuration, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.localServiceBuilder(configuration).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Void> shutDownResourceManagerComponents() {
        ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<CompletableFuture<Void>>(this.dispatcherResourceManagerComponents.size());
        for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : this.dispatcherResourceManagerComponents) {
            terminationFutures.add(dispatcherResourceManagerComponent.closeAsync());
        }
        FutureUtils.ConjunctFuture<Void> dispatcherTerminationFuture = FutureUtils.completeAll(terminationFutures);
        return FutureUtils.runAfterwards(dispatcherTerminationFuture, () -> {
            Exception exception = null;
            Object object = this.lock;
            synchronized (object) {
                if (this.resourceManagerLeaderRetriever != null) {
                    try {
                        this.resourceManagerLeaderRetriever.stop();
                    }
                    catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, exception);
                    }
                    this.resourceManagerLeaderRetriever = null;
                }
                if (this.dispatcherLeaderRetriever != null) {
                    try {
                        this.dispatcherLeaderRetriever.stop();
                    }
                    catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, exception);
                    }
                    this.dispatcherLeaderRetriever = null;
                }
                if (this.clusterRestEndpointLeaderRetrievalService != null) {
                    try {
                        this.clusterRestEndpointLeaderRetrievalService.stop();
                    }
                    catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, exception);
                    }
                    this.clusterRestEndpointLeaderRetrievalService = null;
                }
            }
            if (exception != null) {
                throw exception;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void terminateMiniClusterServices() throws Exception {
        Exception exception = null;
        Object object = this.lock;
        synchronized (object) {
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                }
                catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
                this.blobCacheService = null;
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                }
                catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
                this.blobServer = null;
            }
            if (this.haServices != null) {
                try {
                    this.haServices.closeAndCleanupAllData();
                }
                catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
                this.haServices = null;
            }
            try {
                if (this.rpcSystem.isOwned()) {
                    this.rpcSystem.deref().close();
                }
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    private CompletableFuture<Void> terminateRpcServices() {
        Object object = this.lock;
        synchronized (object) {
            int numRpcServices = 1 + this.rpcServices.size();
            ArrayList<CompletableFuture> rpcTerminationFutures = new ArrayList<CompletableFuture>(numRpcServices);
            rpcTerminationFutures.add(this.commonRpcService.stopService());
            for (RpcService rpcService : this.rpcServices) {
                rpcTerminationFutures.add(rpcService.stopService());
            }
            this.commonRpcService = null;
            this.rpcServices.clear();
            return FutureUtils.completeAll(rpcTerminationFutures);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> terminateExecutors(long executorShutdownTimeoutMillis) {
        Object object = this.lock;
        synchronized (object) {
            if (this.ioExecutor != null) {
                return ExecutorUtils.nonBlockingShutdown(executorShutdownTimeoutMillis, TimeUnit.MILLISECONDS, this.ioExecutor);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteDirectories() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.workingDirectory != null) {
                this.workingDirectory.delete();
            }
        }
    }

    private static JobGraph cloneJobGraph(JobGraph jobGraph) {
        try {
            return InstantiationUtil.clone(jobGraph);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new IllegalStateException("Unable to clone the provided JobGraph.", e);
        }
    }

    private void initializeIOFormatClasses(Configuration configuration) {
        FileOutputFormat.initDefaultsFromConfiguration(configuration);
    }

    private class RegularHighAvailabilityServicesFactory
    implements HighAvailabilityServicesFactory {
        private RegularHighAvailabilityServicesFactory() {
        }

        @Override
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception {
            return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, executor, new ShutDownFatalErrorHandler());
        }
    }

    private static class SingletonHighAvailabilityServicesFactory
    implements HighAvailabilityServicesFactory {
        private final BiFunctionWithException<Configuration, Executor, HighAvailabilityServices, Exception> creationCallback;
        @Nullable
        private HighAvailabilityServices haServices;

        public SingletonHighAvailabilityServicesFactory(BiFunctionWithException<Configuration, Executor, HighAvailabilityServices, Exception> creationCallback) {
            this.creationCallback = creationCallback;
        }

        @Override
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception {
            if (this.haServices == null) {
                this.haServices = this.creationCallback.apply(configuration, executor);
            }
            return this.haServices;
        }
    }

    public static enum HaServices {
        CONFIGURED,
        WITH_LEADERSHIP_CONTROL;

    }

    private class TerminatingFatalErrorHandlerFactory {
        private TerminatingFatalErrorHandlerFactory() {
        }

        @GuardedBy(value="lock")
        private TerminatingFatalErrorHandler create(int index) {
            return new TerminatingFatalErrorHandler(index);
        }
    }

    private class ShutDownFatalErrorHandler
    implements FatalErrorHandler {
        private ShutDownFatalErrorHandler() {
        }

        public void onFatalError(Throwable exception) {
            LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", exception);
            MiniCluster.this.closeAsync();
        }
    }

    private class TerminatingFatalErrorHandler
    implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int index) {
            this.index = index;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFatalError(Throwable exception) {
            if (MiniCluster.this.running) {
                LOG.error("TaskManager #{} failed.", (Object)this.index, (Object)exception);
                Object object = MiniCluster.this.lock;
                synchronized (object) {
                    ((TaskExecutor)MiniCluster.this.taskManagers.get(this.index)).closeAsync();
                }
            }
        }
    }

    protected class DedicatedRpcServiceFactory
    implements RpcServiceFactory {
        private final Configuration configuration;
        private final String externalAddress;
        private final String externalPortRange;
        private final String bindAddress;
        private final RpcSystem rpcSystem;

        DedicatedRpcServiceFactory(Configuration configuration, String externalAddress, String externalPortRange, String bindAddress, RpcSystem rpcSystem) {
            this.configuration = configuration;
            this.externalAddress = externalAddress;
            this.externalPortRange = externalPortRange;
            this.bindAddress = bindAddress;
            this.rpcSystem = rpcSystem;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public RpcService createRpcService() throws Exception {
            RpcService rpcService = MiniCluster.this.createRemoteRpcService(this.configuration, this.externalAddress, this.externalPortRange, this.bindAddress, this.rpcSystem);
            Object object = MiniCluster.this.lock;
            synchronized (object) {
                MiniCluster.this.rpcServices.add(rpcService);
            }
            return rpcService;
        }
    }

    protected static class CommonRpcServiceFactory
    implements RpcServiceFactory {
        private final RpcService commonRpcService;

        CommonRpcServiceFactory(RpcService commonRpcService) {
            this.commonRpcService = commonRpcService;
        }

        @Override
        public RpcService createRpcService() {
            return this.commonRpcService;
        }
    }

    protected static interface RpcServiceFactory {
        public RpcService createRpcService() throws Exception;
    }
}

