package org.apache.flink.table.store.spark;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.SupportsWrite;
import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.SerializableCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.RowType;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.write.V1Write;
import org.apache.spark.sql.sources.InsertableRelation;

/* loaded from: input_file:org/apache/flink/table/store/spark/SparkWrite.class */
public class SparkWrite implements V1Write {
    private final SupportsWrite table;
    private final String queryId;
    private final Lock.Factory lockFactory;
    private final Configuration conf;

    /* loaded from: input_file:org/apache/flink/table/store/spark/SparkWrite$ComputeBucket.class */
    private static class ComputeBucket implements Function<Row, Integer> {
        private final SupportsWrite table;
        private final RowType type;
        private final Configuration conf;
        private transient BucketComputer lazyComputer;

        private ComputeBucket(SupportsWrite supportsWrite, Configuration configuration) {
            this.table = supportsWrite;
            this.type = supportsWrite.rowType();
            this.conf = configuration;
        }

        private BucketComputer computer() {
            if (this.lazyComputer == null) {
                this.lazyComputer = this.table.bucketComputer();
            }
            return this.lazyComputer;
        }

        public Integer call(Row row) {
            return Integer.valueOf(computer().bucket(new SparkRowData(this.type, row)));
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            objectInputStream.defaultReadObject();
            FileSystems.initialize(this.table.location(), this.conf);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/spark/SparkWrite$ListConcat.class */
    private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
        private ListConcat() {
        }

        public List<T> call(List<T> list, List<T> list2) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(list);
            arrayList.addAll(list2);
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/spark/SparkWrite$WriteRecords.class */
    private static class WriteRecords implements Function<Iterable<Row>, List<SerializableCommittable>> {
        private final SupportsWrite table;
        private final RowType type;
        private final String queryId;
        private final long commitIdentifier;
        private final Configuration conf;

        private WriteRecords(SupportsWrite supportsWrite, String str, long j, Configuration configuration) {
            this.table = supportsWrite;
            this.type = supportsWrite.rowType();
            this.queryId = str;
            this.commitIdentifier = j;
            this.conf = configuration;
        }

        public List<SerializableCommittable> call(Iterable<Row> iterable) throws Exception {
            TableWrite newWrite = this.table.newWrite(this.queryId);
            Throwable th = null;
            try {
                try {
                    Iterator<Row> it = iterable.iterator();
                    while (it.hasNext()) {
                        newWrite.write(new SparkRowData(this.type, it.next()));
                    }
                    List<SerializableCommittable> list = (List) newWrite.prepareCommit(true, this.commitIdentifier).stream().map(SerializableCommittable::wrap).collect(Collectors.toList());
                    if (newWrite != null) {
                        if (0 != 0) {
                            try {
                                newWrite.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newWrite.close();
                        }
                    }
                    return list;
                } finally {
                }
            } catch (Throwable th3) {
                if (newWrite != null) {
                    if (th != null) {
                        try {
                            newWrite.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newWrite.close();
                    }
                }
                throw th3;
            }
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            objectInputStream.defaultReadObject();
            FileSystems.initialize(this.table.location(), this.conf);
        }
    }

    public SparkWrite(SupportsWrite supportsWrite, String str, Lock.Factory factory, Configuration configuration) {
        this.table = supportsWrite;
        this.queryId = str;
        this.lockFactory = factory;
        this.conf = configuration;
    }

    public InsertableRelation toInsertableRelation() {
        return (dataset, z) -> {
            if (z) {
                throw new UnsupportedOperationException("Overwrite is unsupported.");
            }
            List list = (List) dataset.toJavaRDD().groupBy(new ComputeBucket(this.table, this.conf)).mapValues(new WriteRecords(this.table, this.queryId, 0L, this.conf)).values().reduce(new ListConcat());
            try {
                TableCommit withLock = this.table.newCommit(this.queryId).withLock(this.lockFactory.create());
                Throwable th = null;
                try {
                    withLock.commit(0L, (List) list.stream().map((v0) -> {
                        return v0.delegate();
                    }).collect(Collectors.toList()));
                    if (withLock != null) {
                        if (0 != 0) {
                            try {
                                withLock.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            withLock.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }
}
