package org.apache.flink.table.store.table;

import java.util.Collections;
import java.util.List;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.class */
public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
    private static final long serialVersionUID = 1;
    private transient KeyValueFileStore lazyStore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable$ValueCountTableKeyValueFieldsExtractor.class */
    public static class ValueCountTableKeyValueFieldsExtractor implements KeyValueFieldsExtractor {
        private static final long serialVersionUID = 1;
        static final ValueCountTableKeyValueFieldsExtractor EXTRACTOR = new ValueCountTableKeyValueFieldsExtractor();

        private ValueCountTableKeyValueFieldsExtractor() {
        }

        @Override // org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor
        public List<DataField> keyFields(TableSchema tableSchema) {
            return tableSchema.fields();
        }

        @Override // org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor
        public List<DataField> valueFields(TableSchema tableSchema) {
            return Collections.singletonList(new DataField(0, TableSchema.VALUE_COUNT, new AtomicDataType(new BigIntType(false))));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangelogValueCountFileStoreTable(Path path, TableSchema tableSchema) {
        super(path, tableSchema);
    }

    @Override // org.apache.flink.table.store.table.AbstractFileStoreTable
    protected FileStoreTable copy(TableSchema tableSchema) {
        return new ChangelogValueCountFileStoreTable(this.path, tableSchema);
    }

    @Override // org.apache.flink.table.store.table.AbstractFileStoreTable
    public KeyValueFileStore store() {
        if (this.lazyStore == null) {
            ValueCountTableKeyValueFieldsExtractor valueCountTableKeyValueFieldsExtractor = ValueCountTableKeyValueFieldsExtractor.EXTRACTOR;
            this.lazyStore = new KeyValueFileStore(schemaManager(), this.tableSchema.id(), new CoreOptions(this.tableSchema.options()), this.tableSchema.logicalPartitionType(), this.tableSchema.logicalBucketKeyType(), RowDataType.toRowType(false, valueCountTableKeyValueFieldsExtractor.keyFields(this.tableSchema)), RowDataType.toRowType(false, valueCountTableKeyValueFieldsExtractor.valueFields(this.tableSchema)), valueCountTableKeyValueFieldsExtractor, ValueCountMergeFunction.factory());
        }
        return this.lazyStore;
    }

    @Override // org.apache.flink.table.store.table.DataTable, org.apache.flink.table.store.table.Table
    public AbstractDataTableScan newScan() {
        final KeyValueFileStoreScan newScan = store().newScan();
        return new AbstractDataTableScan(newScan, this.tableSchema, store().pathFactory(), options()) { // from class: org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.1
            @Override // org.apache.flink.table.store.table.source.AbstractDataTableScan
            protected SplitGenerator splitGenerator(FileStorePathFactory fileStorePathFactory) {
                return new MergeTreeSplitGenerator(ChangelogValueCountFileStoreTable.this.store().newKeyComparator(), ChangelogValueCountFileStoreTable.this.store().options().splitTargetSize(), ChangelogValueCountFileStoreTable.this.store().options().splitOpenFileCost());
            }

            @Override // org.apache.flink.table.store.table.source.AbstractDataTableScan
            protected void withNonPartitionFilter(Predicate predicate) {
                newScan.withKeyFilter(predicate);
            }
        };
    }

    @Override // org.apache.flink.table.store.table.Table
    public TableRead newRead() {
        return new KeyValueTableRead(store().newRead()) { // from class: org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.2
            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withFilter(Predicate predicate) {
                this.read.withFilter(predicate);
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withProjection(int[][] iArr) {
                this.read.withKeyProjection(iArr);
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.KeyValueTableRead
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> recordIterator) {
                return new ValueCountRowDataRecordIterator(recordIterator);
            }
        };
    }

    @Override // org.apache.flink.table.store.table.SupportsWrite
    public TableWrite newWrite(String str) {
        KeyValue keyValue = new KeyValue();
        return new TableWriteImpl(store().newWrite(str), new SinkRecordConverter(this.tableSchema), sinkRecord -> {
            switch (sinkRecord.row().getRowKind()) {
                case INSERT:
                case UPDATE_AFTER:
                    keyValue.replace(sinkRecord.row(), RowKind.INSERT, GenericRowData.of(1L));
                    break;
                case UPDATE_BEFORE:
                case DELETE:
                    keyValue.replace(sinkRecord.row(), RowKind.INSERT, GenericRowData.of(-1L));
                    break;
                default:
                    throw new UnsupportedOperationException("Unknown row kind " + sinkRecord.row().getRowKind());
            }
            return keyValue;
        });
    }
}
