package org.apache.flink.table.store.table.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;

/* loaded from: input_file:org/apache/flink/table/store/table/source/TableStreamingReader.class */
public class TableStreamingReader {
    private final FileStoreTable table;
    private final int[] projection;

    @Nullable
    private final Predicate predicate;

    @Nullable
    private final PredicateFilter recordFilter;
    private final SnapshotEnumerator enumerator;

    public TableStreamingReader(FileStoreTable fileStoreTable, int[] iArr, @Nullable Predicate predicate) {
        this.table = fileStoreTable;
        this.projection = iArr;
        this.predicate = predicate;
        if (predicate != null) {
            List<String> fieldNames = fileStoreTable.schema().fieldNames();
            List<String> primaryKeys = fileStoreTable.schema().primaryKeys();
            this.recordFilter = new PredicateFilter(TypeUtils.project(fileStoreTable.schema().logicalRowType(), iArr), PredicateBuilder.transformFieldMapping(predicate, IntStream.range(0, fileStoreTable.schema().fields().size()).map(i -> {
                int indexOf = Ints.indexOf(iArr, i);
                if (primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i))) {
                    return indexOf;
                }
                return -1;
            }).toArray()).orElse(null));
        } else {
            this.recordFilter = null;
        }
        DataTableScan newScan = fileStoreTable.newScan();
        if (predicate != null) {
            newScan.withFilter(predicate);
        }
        this.enumerator = ContinuousDataFileSnapshotEnumerator.createWithSnapshotStarting(fileStoreTable, newScan);
    }

    @Nullable
    public Iterator<RowData> nextBatch() throws Exception {
        DataTableScan.DataFilePlan enumerate = this.enumerator.enumerate();
        if (enumerate == null) {
            return null;
        }
        return read(enumerate);
    }

    private Iterator<RowData> read(DataTableScan.DataFilePlan dataFilePlan) throws IOException {
        TableRead withProjection = this.table.newRead().withProjection(this.projection);
        if (this.predicate != null) {
            withProjection.withFilter(this.predicate);
        }
        ArrayList arrayList = new ArrayList();
        for (DataSplit dataSplit : dataFilePlan.splits) {
            arrayList.add(() -> {
                return withProjection.createReader(dataSplit);
            });
        }
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(ConcatRecordReader.create(arrayList));
        if (this.recordFilter == null) {
            return recordReaderIterator;
        }
        PredicateFilter predicateFilter = this.recordFilter;
        predicateFilter.getClass();
        return Iterators.filter(recordReaderIterator, predicateFilter::test);
    }
}
