/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Deque;
import java.util.Optional;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.state.SharedStateRegistry;

public abstract class AbstractCompleteCheckpointStore
implements CompletedCheckpointStore {
    private final SharedStateRegistry sharedStateRegistry;

    public AbstractCompleteCheckpointStore(SharedStateRegistry sharedStateRegistry) {
        this.sharedStateRegistry = sharedStateRegistry;
    }

    @Override
    public SharedStateRegistry getSharedStateRegistry() {
        return this.sharedStateRegistry;
    }

    @Override
    public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
        if (jobStatus.isGloballyTerminalState()) {
            this.sharedStateRegistry.close();
        }
    }

    protected void unregisterUnusedState(Deque<CompletedCheckpoint> unSubsumedCheckpoints) {
        AbstractCompleteCheckpointStore.findLowest(unSubsumedCheckpoints).ifPresent(this.sharedStateRegistry::unregisterUnusedState);
    }

    private static Optional<Long> findLowest(Deque<CompletedCheckpoint> unSubsumedCheckpoints) {
        for (CompletedCheckpoint p : unSubsumedCheckpoints) {
            if (p.getProperties().isSavepoint()) continue;
            return Optional.of(p.getCheckpointID());
        }
        return Optional.empty();
    }
}

