/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.storage.opensearch2;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.ws.rs.ForbiddenException;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.graylog.shaded.opensearch2.org.opensearch.OpenSearchException;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.open.OpenIndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.master.AcknowledgedResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.Request;
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.Response;
import org.graylog.shaded.opensearch2.org.opensearch.client.ResponseException;
import org.graylog.shaded.opensearch2.org.opensearch.client.indices.CloseIndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.tasks.TaskSubmissionResponse;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.health.ClusterHealthStatus;
import org.graylog.shaded.opensearch2.org.opensearch.common.xcontent.json.JsonXContent;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.bytes.BytesReference;
import org.graylog.shaded.opensearch2.org.opensearch.core.xcontent.ToXContent;
import org.graylog.shaded.opensearch2.org.opensearch.core.xcontent.XContentBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders;
import org.graylog.shaded.opensearch2.org.opensearch.index.reindex.ReindexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.index.reindex.RemoteInfo;
import org.graylog.storage.opensearch2.AggregatedConnectionResponse;
import org.graylog.storage.opensearch2.BlockResponse;
import org.graylog.storage.opensearch2.ConnectionCheckRequest;
import org.graylog.storage.opensearch2.DatanodeRemoteConnectionCheckResource;
import org.graylog.storage.opensearch2.DatanodeRemoteIndexStateResource;
import org.graylog.storage.opensearch2.GetTaskResponse;
import org.graylog.storage.opensearch2.IndexState;
import org.graylog.storage.opensearch2.IndexStateChangeRequest;
import org.graylog.storage.opensearch2.IndexStateGetRequest;
import org.graylog.storage.opensearch2.OpenSearchClient;
import org.graylog.storage.opensearch2.RemoteReindexAllowlist;
import org.graylog.storage.opensearch2.Task;
import org.graylog.storage.opensearch2.TaskResponseFailure;
import org.graylog2.cluster.lock.Lock;
import org.graylog2.datanode.RemoteReindexAllowlistEvent;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.datanode.DatanodeMigrationLockService;
import org.graylog2.indexer.datanode.DatanodeMigrationLockWaitConfig;
import org.graylog2.indexer.datanode.IndexMigrationConfiguration;
import org.graylog2.indexer.datanode.MigrationConfiguration;
import org.graylog2.indexer.datanode.RemoteReindexMigrationService;
import org.graylog2.indexer.datanode.RemoteReindexRequest;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.migration.IndexMigrationProgress;
import org.graylog2.indexer.migration.IndexerConnectionCheckResult;
import org.graylog2.indexer.migration.LogEntry;
import org.graylog2.indexer.migration.LogLevel;
import org.graylog2.indexer.migration.RemoteIndex;
import org.graylog2.indexer.migration.RemoteReindexIndex;
import org.graylog2.indexer.migration.RemoteReindexMigration;
import org.graylog2.indexer.migration.TaskStatus;
import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob;
import org.graylog2.indexer.ranges.RebuildIndexRangesJob;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.periodical.IndexRangesCleanupPeriodical;
import org.graylog2.plugin.Tools;
import org.graylog2.rest.resources.datanodes.DatanodeRestApiProxy;
import org.graylog2.system.jobs.SystemJob;
import org.graylog2.system.jobs.SystemJobConcurrencyException;
import org.graylog2.system.jobs.SystemJobManager;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class RemoteReindexingMigrationAdapterOS2
implements RemoteReindexingMigrationAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteReindexingMigrationAdapterOS2.class);
    private static final int CONNECTION_ATTEMPTS = 40;
    private static final int WAIT_BETWEEN_CONNECTION_ATTEMPTS = 3;
    public static final int TASK_UPDATE_INTERVAL_MILLIS = 1000;
    private final OpenSearchClient client;
    private final Indices indices;
    private final IndexSetRegistry indexSetRegistry;
    private final ClusterEventBus eventBus;
    private final ObjectMapper objectMapper;
    private final RemoteReindexMigrationService reindexMigrationService;
    private final DatanodeRestApiProxy datanodeRestApiProxy;
    private final NotificationService notificationService;
    private final IndexRangesCleanupPeriodical indexRangesCleanupPeriodical;
    private final RebuildIndexRangesJob.Factory rebuildIndexRangesJobFactory;
    private final CreateNewSingleIndexRangeJob.Factory singleIndexRangeJobFactory;
    private final SystemJobManager systemJobManager;
    private final DatanodeMigrationLockService migrationLockService;

    @Inject
    public RemoteReindexingMigrationAdapterOS2(OpenSearchClient client, Indices indices, IndexSetRegistry indexSetRegistry, ClusterEventBus eventBus, ObjectMapper objectMapper, RemoteReindexMigrationService reindexMigrationService, DatanodeRestApiProxy datanodeRestApiProxy, NotificationService notificationService, IndexRangesCleanupPeriodical indexRangesCleanupPeriodical, RebuildIndexRangesJob.Factory rebuildIndexRangesJobFactory, CreateNewSingleIndexRangeJob.Factory singleIndexRangeJobFactory, SystemJobManager systemJobManager, DatanodeMigrationLockService migrationLockService) {
        this.client = client;
        this.indices = indices;
        this.indexSetRegistry = indexSetRegistry;
        this.eventBus = eventBus;
        this.objectMapper = objectMapper;
        this.reindexMigrationService = reindexMigrationService;
        this.datanodeRestApiProxy = datanodeRestApiProxy;
        this.notificationService = notificationService;
        this.indexRangesCleanupPeriodical = indexRangesCleanupPeriodical;
        this.rebuildIndexRangesJobFactory = rebuildIndexRangesJobFactory;
        this.singleIndexRangeJobFactory = singleIndexRangeJobFactory;
        this.systemJobManager = systemJobManager;
        this.migrationLockService = migrationLockService;
    }

    public boolean isMigrationRunning(IndexSet indexSet) {
        return this.reindexMigrationService.getLatestMigrationId().map(this::status).map(migration -> this.isIndexSetCurrentlyMigrated(indexSet, (RemoteReindexMigration)migration)).orElse(false);
    }

    @Nonnull
    private Boolean isIndexSetCurrentlyMigrated(IndexSet indexSet, RemoteReindexMigration mig) {
        if (mig.status() == RemoteReindexingMigrationAdapter.Status.NOT_STARTED || mig.status() == RemoteReindexingMigrationAdapter.Status.RUNNING) {
            Set runningIndices = mig.indices().stream().filter(i -> !i.isCompleted()).map(RemoteReindexIndex::name).collect(Collectors.toSet());
            Set migratedIndexSets = this.indexSetRegistry.getForIndices(runningIndices);
            return migratedIndexSets.contains(indexSet);
        }
        return false;
    }

    public String start(RemoteReindexRequest request) {
        AggregatedConnectionResponse response = this.getAllIndicesFrom(request.uri(), request.username(), request.password(), request.trustUnknownCerts());
        MigrationConfiguration migration = this.reindexMigrationService.saveMigration(MigrationConfiguration.forIndices((List)request.indices(), response.certificates()));
        this.doStartMigration(migration, request);
        return migration.id();
    }

    private void doStartMigration(MigrationConfiguration migration, RemoteReindexRequest request) {
        try {
            new Thread(() -> {
                this.prepareCluster(request, migration);
                Set<Lock> locks = this.lockIndexSets(migration);
                this.createIndicesInNewCluster(migration);
                this.startAsyncTasks(migration, request, locks);
                this.recaluculateAllIndexRanges();
                this.createSystemNotification(RemoteReindexingMigrationAdapter.Status.RUNNING);
            }).start();
        }
        catch (Exception e) {
            LOG.error("Failed to start remote reindex migration", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private Set<Lock> lockIndexSets(MigrationConfiguration migration) {
        DatanodeMigrationLockWaitConfig lockWaitConfig = new DatanodeMigrationLockWaitConfig(java.time.Duration.ofSeconds(5L), java.time.Duration.ofMinutes(30L), (indexSet, caller, attemptNumber) -> this.logInfo(migration, "Awaiting lock of index set " + indexSet.getConfig().title() + ", attempt #" + attemptNumber));
        return migration.indices().stream().map(IndexMigrationConfiguration::indexName).map(arg_0 -> ((IndexSetRegistry)this.indexSetRegistry).getForIndex(arg_0)).filter(Optional::isPresent).map(Optional::get).distinct().map(indexSet -> this.migrationLockService.acquireLock(indexSet, RemoteReindexingMigrationAdapterOS2.class, migration.id(), lockWaitConfig)).collect(Collectors.toSet());
    }

    private void createIndicesInNewCluster(MigrationConfiguration migration) {
        migration.indices().forEach(index -> {
            if (this.indices.exists(index.indexName())) {
                this.logInfo(migration, String.format(Locale.ROOT, "Index %s does already exist in target indexer. Data will be migrated into existing index.", index.indexName()));
                return;
            }
            boolean created = this.indices.create(index.indexName(), this.indexSetRegistry.getForIndex(index.indexName()).orElse(this.indexSetRegistry.getDefault()));
            if (created) {
                this.logInfo(migration, "Created new target index " + index.indexName());
                return;
            }
            String message = "Could not create new target index <" + index.indexName() + ">.";
            this.logError(migration, message, null);
            throw new IllegalStateException(message);
        });
    }

    private void prepareCluster(RemoteReindexRequest req, MigrationConfiguration migration) {
        RemoteReindexAllowlist allowlist = new RemoteReindexAllowlist(req.uri(), req.allowlist());
        if (!allowlist.isClusterSettingMatching(this.clusterAllowlistSetting())) {
            this.logInfo(migration, "Preparing cluster for remote reindex migration " + migration.id() + ", setting allowlist to: " + req.allowlist());
            this.allowReindexing(allowlist, migration);
            this.waitForClusterRestart(allowlist, migration);
        } else {
            this.logInfo(migration, "Remote reindex allowlist already configured, skipping cluster configuration and restart.");
        }
    }

    private ReindexRequest createReindexRequest(String index, BytesReference query, URI uri, String username, String password, MigrationConfiguration migration) {
        ReindexRequest reindexRequest = new ReindexRequest();
        reindexRequest.setRemoteInfo(new RemoteInfo(uri.getScheme(), uri.getHost(), uri.getPort(), uri.getPath(), query, username, password, Map.of(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT)).setSourceIndices(index).setDestIndex(index).setShouldStoreResult(true);
        return reindexRequest;
    }

    public RemoteReindexMigration status(@Nonnull String migrationID) {
        return this.reindexMigrationService.getMigration(migrationID).map(migrationConfiguration -> {
            List indices = migrationConfiguration.indices().parallelStream().map(indexConfig -> indexConfig.taskId().flatMap(this::getTask).map(task -> this.taskToIndex(indexConfig.indexName(), (GetTaskResponse)task)).orElse(RemoteReindexIndex.noBackgroundTaskYet((String)indexConfig.indexName()))).sorted(Comparator.comparing(RemoteReindexIndex::name)).collect(Collectors.toList());
            return new RemoteReindexMigration(migrationID, indices, migrationConfiguration.logs());
        }).orElse(RemoteReindexMigration.nonExistent((String)migrationID));
    }

    private RemoteReindexIndex taskToIndex(String indexName, GetTaskResponse task) {
        DateTime created = new DateTime(task.task().startTimeInMillis(), DateTimeZone.UTC);
        Duration duration = RemoteReindexingMigrationAdapterOS2.getDuration(task);
        IndexMigrationProgress progress = this.toProgress(task.task().status());
        String taskID = task.task().taskID();
        if (task.completed()) {
            String errors = this.getErrors(task);
            if (errors != null) {
                return new RemoteReindexIndex(taskID, indexName, RemoteReindexingMigrationAdapter.Status.ERROR, created, duration, progress, errors);
            }
            return new RemoteReindexIndex(taskID, indexName, RemoteReindexingMigrationAdapter.Status.FINISHED, created, duration, progress, null);
        }
        return new RemoteReindexIndex(taskID, indexName, RemoteReindexingMigrationAdapter.Status.RUNNING, created, duration, progress, null);
    }

    private IndexMigrationProgress toProgress(TaskStatus status) {
        return new IndexMigrationProgress(status.total(), status.created(), status.updated(), status.deleted(), status.versionConflicts(), status.noops());
    }

    @Nullable
    private String getErrors(GetTaskResponse task) {
        if (task.error() != null) {
            return task.toString();
        }
        if (task.task().status().hasFailures()) {
            return String.join((CharSequence)";", task.task().status().failures());
        }
        if (task.response().failures() != null && !task.response().failures().isEmpty()) {
            return task.response().failures().stream().map(TaskResponseFailure::cause).filter(Objects::nonNull).map(f -> f.type() + ": " + f.reason()).distinct().collect(Collectors.joining(";"));
        }
        return null;
    }

    public IndexerConnectionCheckResult checkConnection(@Nonnull URI remoteHost, @Nullable String username, @Nullable String password, @Nullable String allowlist, boolean trustUnknownCerts) {
        try {
            RemoteReindexAllowlist reindexAllowlist = new RemoteReindexAllowlist(remoteHost, allowlist);
            reindexAllowlist.validate();
            AggregatedConnectionResponse results = this.getAllIndicesFrom(remoteHost, username, password, trustUnknownCerts);
            List<RemoteIndex> indices = results.indices().stream().map(i -> new RemoteIndex(i.name(), this.indexSetRegistry.isManagedIndex(i.name()), i.closed())).distinct().toList();
            if (results.error() != null && !results.error().isEmpty()) {
                return IndexerConnectionCheckResult.failure((String)results.error());
            }
            return IndexerConnectionCheckResult.success(indices);
        }
        catch (Exception e) {
            return IndexerConnectionCheckResult.failure((Exception)e);
        }
    }

    public Optional<String> getLatestMigrationId() {
        return this.reindexMigrationService.getLatestMigrationId();
    }

    private String clusterAllowlistSetting() {
        return this.client.execute((restHighLevelClient, requestOptions) -> {
            ClusterGetSettingsRequest request = new ClusterGetSettingsRequest();
            request.includeDefaults(true);
            ClusterGetSettingsResponse settings = restHighLevelClient.cluster().getSettings(request, (RequestOptions)requestOptions);
            return settings.getSetting("reindex.remote.allowlist");
        });
    }

    private void waitForClusterRestart(RemoteReindexAllowlist allowlist, MigrationConfiguration migration) {
        Retryer retryer = RetryerBuilder.newBuilder().withWaitStrategy(WaitStrategies.fixedWait((long)3L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt((int)40)).withRetryListener(this.createClusterRestartWaitListener(migration)).retryIfException().retryIfResult(status -> !status.isClusterReady()).build();
        try {
            retryer.call(() -> this.remoteReindexClusterState(allowlist));
            this.logInfo(migration, "Datanode cluster successfully reconfigured and restarted.");
        }
        catch (RetryException | ExecutionException e) {
            String message = "Cluster failed to restart after 120 seconds.";
            this.logError(migration, "Cluster failed to restart after 120 seconds.", (Exception)e);
            throw new RuntimeException("Cluster failed to restart after 120 seconds.");
        }
    }

    @Nonnull
    private RetryListener createClusterRestartWaitListener(final MigrationConfiguration migration) {
        return new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.hasResult()) {
                    RemoteReindexConfigurationStatus status = (RemoteReindexConfigurationStatus)attempt.getResult();
                    String message = String.format(Locale.ROOT, "Waiting for datanode cluster to reconfigure and restart, attempt %d. Cluster health: %s, allowlist configured: %b (cluster setting value: %s).", attempt.getAttemptNumber(), status.status(), status.allowlistConfigured(), status.clusterAllowlistSetting());
                    RemoteReindexingMigrationAdapterOS2.this.logInfo(migration, message);
                } else {
                    RemoteReindexingMigrationAdapterOS2.this.logInfo(migration, "Waiting for datanode cluster to reconfigure and restart, attempt #" + attempt.getAttemptNumber());
                }
            }
        };
    }

    private RemoteReindexConfigurationStatus remoteReindexClusterState(RemoteReindexAllowlist allowlist) {
        ClusterHealthResponse clusterHealth = this.client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT));
        String clusterAllowlistSetting = this.clusterAllowlistSetting();
        boolean remoteReindexAllowed = allowlist.isClusterSettingMatching(clusterAllowlistSetting);
        return new RemoteReindexConfigurationStatus(remoteReindexAllowed, clusterAllowlistSetting, clusterHealth.getStatus());
    }

    void allowReindexing(RemoteReindexAllowlist allowlist, MigrationConfiguration migration) {
        if (migration.certificates() != null && !migration.certificates().isEmpty()) {
            this.eventBus.post((Object)RemoteReindexAllowlistEvent.add(allowlist.value(), (List)migration.certificates()));
        } else {
            this.eventBus.post((Object)RemoteReindexAllowlistEvent.add(allowlist.value()));
        }
    }

    private AggregatedConnectionResponse getAllIndicesFrom(URI uri, String username, String password, boolean trustUnknownCerts) {
        ConnectionCheckRequest req = new ConnectionCheckRequest(RemoteReindexingMigrationAdapterOS2.uriToString(uri), username, password, trustUnknownCerts);
        Map responses = this.datanodeRestApiProxy.remoteInterface("all", DatanodeRemoteConnectionCheckResource.class, resource -> resource.opensearch(req));
        return new AggregatedConnectionResponse(responses);
    }

    private static String uriToString(URI uri) {
        try {
            return uri.toURL().toString();
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    private void startAsyncTasks(MigrationConfiguration migration, RemoteReindexRequest request, Set<Lock> locks) {
        int threadsCount = Math.max(1, Math.min(request.threadsCount(), migration.indices().size()));
        ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactoryBuilder().setNameFormat("remote-reindex-migration-backend-%d").setDaemon(true).setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new Tools.LogUncaughtExceptionHandler(LOG)).build());
        migration.indices().forEach(index -> executorService.submit(() -> this.executeReindexAsync(migration, request.uri(), request.username(), request.password(), (IndexMigrationConfiguration)index, locks)));
    }

    private void executeReindexAsync(MigrationConfiguration migration, URI uri, String username, String password, IndexMigrationConfiguration index, Set<Lock> locks) {
        String indexName = index.indexName();
        try (XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();){
            boolean localClosed;
            ArrayList<Runnable> postMigrationActions = new ArrayList<Runnable>();
            boolean remoteClosed = this.getRemoteIndexState(uri, username, password, indexName) == IndexState.CLOSE;
            boolean bl = localClosed = this.getLocalIndexState(indexName) == IndexState.CLOSE;
            if (remoteClosed) {
                this.openRemoteIndex(migration, uri, username, password, indexName);
                postMigrationActions.add(() -> this.closeRemoteIndex(migration, uri, username, password, indexName));
            }
            if (localClosed) {
                this.openLocalIndex(migration, indexName);
            }
            if (remoteClosed || localClosed) {
                postMigrationActions.add(() -> this.closeLocalIndex(migration, indexName));
            }
            this.retrieveIndexBlock(indexName).ifPresent(block -> this.removeLocalBlock(migration, indexName, (BlockResponse.IndexBlock)block));
            BytesReference query = BytesReference.bytes(QueryBuilders.matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
            this.logInfo(migration, "Executing async reindex for " + indexName);
            TaskSubmissionResponse task = this.client.execute((c, requestOptions) -> {
                RequestOptions withHeader = requestOptions.toBuilder().addHeader("X-Opaque-Id", migration.id()).build();
                return c.submitReindexTask(this.createReindexRequest(indexName, query, uri, username, password, migration), withHeader);
            });
            this.reindexMigrationService.assignTask(migration.id(), indexName, task.getTask());
            this.waitForTaskCompleted(migration, indexName, task.getTask(), locks);
            postMigrationActions.forEach(Runnable::run);
        }
        catch (Exception e) {
            String message = "Could not reindex index: " + indexName + " - " + RemoteReindexingMigrationAdapterOS2.formatErrorMessage(e);
            this.logError(migration, message, e);
        }
    }

    private static String formatErrorMessage(Exception e) {
        StringBuilder message = new StringBuilder();
        if (e.getMessage() != null) {
            message.append(e.getMessage());
        }
        if (e.getCause() != null && e.getCause().getMessage() != null) {
            message.append(" ").append(e.getCause().getMessage());
        }
        return message.toString();
    }

    private void removeLocalBlock(MigrationConfiguration migration, String indexName, BlockResponse.IndexBlock block) {
        this.logInfo(migration, "Index " + indexName + " is blocked: " + block.description() + ". Removing the block now.");
        AcknowledgedResponse acknowledgedResponse = this.client.execute((restHighLevelClient, requestOptions) -> {
            UpdateSettingsRequest settingsRequest = new UpdateSettingsRequest();
            settingsRequest.indices(indexName);
            settingsRequest.settings(Map.of("index.blocks.write", false, "index.blocks.read_only_allow_delete", false));
            return restHighLevelClient.indices().putSettings(settingsRequest, (RequestOptions)requestOptions);
        });
    }

    private Optional<BlockResponse.IndexBlock> retrieveIndexBlock(String indexName) {
        return this.client.execute((restHighLevelClient, requestOptions) -> {
            Response blocksResponse = restHighLevelClient.getLowLevelClient().performRequest(new Request("GET", "_cluster/state/blocks/"));
            try (InputStream is = blocksResponse.getEntity().getContent();){
                BlockResponse indexBlocks = (BlockResponse)this.objectMapper.readValue(is, BlockResponse.class);
                Optional<BlockResponse.IndexBlock> optional = indexBlocks.blocks().forIndex(indexName).filter(b -> b.levels().contains((Object)BlockResponse.BlockLevel.write));
                return optional;
            }
        });
    }

    private void closeLocalIndex(MigrationConfiguration migration, String indexName) {
        this.logInfo(migration, "Target index " + indexName + " is being closed after index migration");
        this.client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.indices().close(new CloseIndexRequest(indexName), (RequestOptions)requestOptions));
    }

    private void openLocalIndex(MigrationConfiguration migration, String indexName) {
        this.logInfo(migration, "Target index " + indexName + " is closed, reopening for the migration");
        this.client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.indices().open(new OpenIndexRequest(indexName), (RequestOptions)requestOptions));
    }

    private void openRemoteIndex(MigrationConfiguration migration, URI uri, String username, String password, String indexName) {
        this.logInfo(migration, "Source index " + indexName + " is closed, reopening for the migration");
        this.datanodeRestApiProxy.remoteInterface("any", DatanodeRemoteIndexStateResource.class, datanodeRemoteIndexStateResource -> datanodeRemoteIndexStateResource.changeState(new IndexStateChangeRequest(indexName, IndexState.OPEN, uri.toString(), username, password)));
    }

    private void closeRemoteIndex(MigrationConfiguration migration, URI uri, String username, String password, String indexName) {
        this.logInfo(migration, "Source index " + indexName + " is being closed after index migration");
        this.datanodeRestApiProxy.remoteInterface("any", DatanodeRemoteIndexStateResource.class, datanodeRemoteIndexStateResource -> datanodeRemoteIndexStateResource.changeState(new IndexStateChangeRequest(indexName, IndexState.CLOSE, uri.toString(), username, password)));
    }

    private IndexState getRemoteIndexState(URI uri, String username, String password, String indexName) {
        return (IndexState)((Object)this.datanodeRestApiProxy.remoteInterface("any", DatanodeRemoteIndexStateResource.class, resource -> resource.readState(new IndexStateGetRequest(indexName, uri.toString(), username, password))).values().iterator().next());
    }

    private IndexState getLocalIndexState(String indexName) {
        return this.client.execute((restHighLevelClient, requestOptions) -> {
            Response statusResponse = restHighLevelClient.getLowLevelClient().performRequest(new Request("GET", "_cat/indices/" + indexName + "/?h=status"));
            try (InputStream is = statusResponse.getEntity().getContent();){
                String result = IOUtils.toString((InputStream)is, (Charset)StandardCharsets.UTF_8);
                IndexState indexState = IndexState.valueOf(result.trim().toUpperCase(Locale.ROOT));
                return indexState;
            }
        });
    }

    private void logInfo(MigrationConfiguration migration, String message) {
        LOG.info(message);
        this.reindexMigrationService.appendLogEntry(migration.id(), new LogEntry(DateTime.now((DateTimeZone)DateTimeZone.UTC), LogLevel.INFO, message));
    }

    private void logError(MigrationConfiguration migration, String message, Exception error) {
        if (error != null) {
            LOG.error(message, (Throwable)error);
        } else {
            LOG.error(message);
        }
        this.reindexMigrationService.appendLogEntry(migration.id(), new LogEntry(DateTime.now((DateTimeZone)DateTimeZone.UTC), LogLevel.ERROR, message));
    }

    private void waitForTaskCompleted(MigrationConfiguration migration, String indexName, String taskID, Set<Lock> locks) {
        while (this.taskIsStillRunning(taskID)) {
            RemoteReindexingMigrationAdapterOS2.sleep();
        }
        this.onTaskFinished(migration, indexName, taskID, locks);
    }

    private boolean taskIsStillRunning(String taskID) {
        return this.getTask(taskID).map(t -> !t.completed()).orElse(true);
    }

    private static void sleep() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<GetTaskResponse> getTask(String taskID) {
        Optional<GetTaskResponse> optional;
        block13: {
            Response taskResponse = this.client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.getLowLevelClient().performRequest(new Request("GET", "_tasks/" + taskID)));
            if (taskResponse.getStatusLine().getStatusCode() == 404) {
                return Optional.empty();
            }
            InputStream is = taskResponse.getEntity().getContent();
            try {
                optional = Optional.of((GetTaskResponse)this.objectMapper.readValue(is, GetTaskResponse.class));
                if (is == null) break block13;
            }
            catch (Throwable throwable) {
                try {
                    try {
                        if (is != null) {
                            try {
                                is.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                catch (OpenSearchException e) {
                    ResponseException responseException;
                    Throwable throwable3;
                    if (e.getCause() != null && (throwable3 = e.getCause()) instanceof ResponseException && (responseException = (ResponseException)throwable3).getResponse().getStatusLine().getStatusCode() == 404) {
                        return Optional.empty();
                    }
                    throw e;
                }
            }
            is.close();
        }
        return optional;
    }

    private void onTaskFinished(MigrationConfiguration migration, String index, String taskID, Set<Lock> locks) {
        Optional<GetTaskResponse> task = this.getTask(taskID);
        task.ifPresent(t -> {
            Duration duration = RemoteReindexingMigrationAdapterOS2.getDuration(t);
            String errors = this.getErrors((GetTaskResponse)t);
            if (errors != null) {
                this.onTaskFailure(migration, index, errors, duration);
            } else {
                this.onTaskSuccess(migration, index, t.task(), duration);
            }
        });
        this.recalculateIndexRanges(index);
        RemoteReindexingMigrationAdapter.Status status = this.getMigrationStatus(migration);
        if (status == RemoteReindexingMigrationAdapter.Status.FINISHED || status == RemoteReindexingMigrationAdapter.Status.ERROR) {
            this.onMigrationFinished(status, locks);
        }
    }

    private void recalculateIndexRanges(String index) {
        this.indices.refresh(new String[]{index});
        try {
            this.systemJobManager.submit((SystemJob)this.singleIndexRangeJobFactory.create(Set.of(), index));
        }
        catch (SystemJobConcurrencyException e) {
            LOG.warn("Unable to trigger index range calculation for index: {}", (Object)index, (Object)e);
        }
    }

    private void onMigrationFinished(RemoteReindexingMigrationAdapter.Status status, Set<Lock> locks) {
        LOG.info("Remote reindexing migration finished");
        this.recaluculateAllIndexRanges();
        this.createSystemNotification(status);
        locks.forEach(arg_0 -> ((DatanodeMigrationLockService)this.migrationLockService).release(arg_0));
    }

    private void recaluculateAllIndexRanges() {
        this.indexRangesCleanupPeriodical.doRun();
        RebuildIndexRangesJob rebuildJob = this.rebuildIndexRangesJobFactory.create(this.indexSetRegistry.getAll());
        try {
            this.systemJobManager.submit((SystemJob)rebuildJob);
        }
        catch (SystemJobConcurrencyException e) {
            String errorMsg = "Concurrency level of this job reached: " + e.getMessage();
            LOG.error(errorMsg, (Throwable)e);
            throw new ForbiddenException(errorMsg);
        }
    }

    private RemoteReindexingMigrationAdapter.Status getMigrationStatus(MigrationConfiguration migration) {
        return Optional.ofNullable(migration).map(MigrationConfiguration::id).map(this::status).map(RemoteReindexMigration::status).orElse(RemoteReindexingMigrationAdapter.Status.RUNNING);
    }

    private static Duration getDuration(GetTaskResponse t) {
        long durationInSec = TimeUnit.SECONDS.convert(t.task().runningTimeInNanos(), TimeUnit.NANOSECONDS);
        return Duration.standardSeconds((long)durationInSec);
    }

    private void onTaskFailure(MigrationConfiguration migration, String index, String error, Duration duration) {
        String message = String.format(Locale.ROOT, "Index %s migration failed after %s: %s.", index, this.humanReadable(duration), error);
        this.logError(migration, message, null);
    }

    private void onTaskSuccess(MigrationConfiguration migration, String index, Task task, Duration duration) {
        TaskStatus taskStatus = task.status();
        Object message = String.format(Locale.ROOT, "Index %s, task %s finished migration after %s. Total %d documents, updated %d, created %d, deleted %d.", index, task.taskID(), this.humanReadable(duration), taskStatus.total(), taskStatus.updated(), taskStatus.created(), taskStatus.deleted());
        if (taskStatus.noops() > 0L || taskStatus.versionConflicts() > 0L) {
            message = (String)message + String.format(Locale.ROOT, " %d documents were not migrated (%d version conflicts, %d ignored)", taskStatus.versionConflicts() + taskStatus.noops(), taskStatus.versionConflicts(), taskStatus.noops());
        }
        this.logInfo(migration, (String)message);
    }

    private String humanReadable(Duration duration) {
        return DurationFormatUtils.formatDurationWords((long)duration.getMillis(), (boolean)true, (boolean)true);
    }

    private void createSystemNotification(RemoteReindexingMigrationAdapter.Status status) {
        if (status != RemoteReindexingMigrationAdapter.Status.RUNNING) {
            this.notificationService.destroyAllByType(Notification.Type.REMOTE_REINDEX_RUNNING);
        }
        Notification notification = this.notificationService.buildNow();
        notification.addType(status == RemoteReindexingMigrationAdapter.Status.RUNNING ? Notification.Type.REMOTE_REINDEX_RUNNING : Notification.Type.REMOTE_REINDEX_FINISHED);
        notification.addDetail("status", (Object)status.name());
        notification.addSeverity(Notification.Severity.NORMAL);
        this.notificationService.publishIfFirst(notification);
    }

    private record RemoteReindexConfigurationStatus(boolean allowlistConfigured, String clusterAllowlistSetting, ClusterHealthStatus status) {
        private boolean isClusterReady() {
            return this.allowlistConfigured && this.status.equals(ClusterHealthStatus.GREEN);
        }
    }
}

