package org.pentaho.hadoop.mapreduce;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.KettleLoggingEvent;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.SingleThreadedTransExecutor;
import org.pentaho.di.trans.step.BaseStepMeta;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.hadoop.mapreduce.PentahoMapReduceBase;
import org.pentaho.hadoop.mapreduce.converter.TypeConverterFactory;
import org.pentaho.hadoop.mapreduce.converter.spi.ITypeConverter;

/* loaded from: input_file:org/pentaho/hadoop/mapreduce/GenericTransReduce.class */
public class GenericTransReduce<K extends WritableComparable<?>, V extends Iterator<Writable>, K2, V2> extends PentahoMapReduceBase<K2, V2> implements Reducer<K, V, K2, V2> {
    protected RowProducer rowProducer;
    protected Object value;
    protected TypeConverterFactory typeConverterFactory;
    protected RowMetaInterface injectorRowMeta;
    protected SingleThreadedTransExecutor executor;
    protected InKeyValueOrdinals inOrdinals = null;
    protected ITypeConverter inConverterK = null;
    protected ITypeConverter inConverterV = null;

    public GenericTransReduce() throws KettleException {
        setMRType(PentahoMapReduceBase.MROperations.Reduce);
        this.typeConverterFactory = new TypeConverterFactory();
    }

    public boolean isSingleThreaded() {
        return this.reduceSingleThreaded;
    }

    public String getInputStepName() {
        return this.reduceInputStepName;
    }

    public String getOutputStepName() {
        return this.reduceOutputStepName;
    }

    public void reduce(K k, Iterator<V> it, OutputCollector<K2, V2> outputCollector, Reporter reporter) throws IOException {
        try {
            if (this.debug) {
                reporter.setStatus("Begin processing record");
            }
            if (this.trans == null) {
                throw new RuntimeException("Error initializing transformation.  See error log.");
            }
            if (isSingleThreaded()) {
                if (!this.trans.isRunning()) {
                    shareVariableSpaceWithTrans(reporter);
                    setTransLogLevel(reporter);
                    prepareExecution(reporter);
                    addInjectorAndProducerToTrans(k, it, outputCollector, reporter, getInputStepName(), getOutputStepName());
                    this.executor = new SingleThreadedTransExecutor(this.trans);
                    if (!this.executor.init()) {
                        throw new KettleException("Unable to initialize the single threaded transformation, check the log for details.");
                    }
                }
                injectValues(k, it, outputCollector, reporter);
                this.executor.oneIteration();
            } else {
                KettleLogStore.discardLines(this.trans.getLogChannelId(), true);
                this.trans = MRUtil.recreateTrans(this.trans);
                shareVariableSpaceWithTrans(reporter);
                setTransLogLevel(reporter);
                prepareExecution(reporter);
                addInjectorAndProducerToTrans(k, it, outputCollector, reporter, getInputStepName(), getOutputStepName());
                try {
                    injectValues(k, it, outputCollector, reporter);
                    this.trans.waitUntilFinished();
                    setDebugStatus(reporter, "Transformation has finished");
                    disposeTransformation();
                } catch (Throwable th) {
                    disposeTransformation();
                    throw th;
                }
            }
            if (this.trans.getErrors() <= 0) {
                if (this.debug) {
                    reporter.setStatus("Completed processing record");
                    return;
                }
                return;
            }
            setDebugStatus(reporter, "Errors detected in reducer/combiner transformation");
            List<KettleLoggingEvent> logBufferFromTo = KettleLogStore.getLogBufferFromTo(this.trans.getLogChannelId(), false, 0, KettleLogStore.getLastBufferLineNr());
            StringBuffer stringBuffer = new StringBuffer();
            for (KettleLoggingEvent kettleLoggingEvent : logBufferFromTo) {
                if (kettleLoggingEvent.getLevel() == LogLevel.ERROR) {
                    stringBuffer.append(kettleLoggingEvent.getMessage().toString()).append("\n");
                }
            }
            throw new Exception("Errors were detected for reducer/combiner transformation:\n\n" + stringBuffer.toString());
        } catch (Exception e) {
            printException(reporter, e);
            setDebugStatus(reporter, "An exception was raised");
            throw new IOException(e);
        }
    }

    private void printException(Reporter reporter, Exception exc) throws IOException {
        exc.printStackTrace(System.err);
        setDebugStatus(reporter, "An exception was raised");
        throw new IOException(exc);
    }

    private void disposeTransformation() {
        try {
            this.trans.stopAll();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            this.trans.cleanup();
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }

    private void injectValues(K k, Iterator<V> it, OutputCollector<K2, V2> outputCollector, Reporter reporter) throws Exception {
        if (this.rowProducer != null) {
            if (this.value != null) {
                if (this.inOrdinals != null) {
                    injectValue(k, this.inOrdinals.getKeyOrdinal(), this.inConverterK, this.value, this.inOrdinals.getValueOrdinal(), this.inConverterV, this.injectorRowMeta, this.rowProducer, reporter);
                } else {
                    injectValue(k, this.inConverterK, this.value, this.inConverterV, this.injectorRowMeta, this.rowProducer, reporter);
                }
            }
            while (it.hasNext()) {
                this.value = it.next();
                if (this.inOrdinals != null) {
                    injectValue(k, this.inOrdinals.getKeyOrdinal(), this.inConverterK, this.value, this.inOrdinals.getValueOrdinal(), this.inConverterV, this.injectorRowMeta, this.rowProducer, reporter);
                } else {
                    injectValue(k, this.inConverterK, this.value, this.inConverterV, this.injectorRowMeta, this.rowProducer, reporter);
                }
            }
            this.value = null;
            this.rowProducer.finished();
        }
    }

    private void prepareExecution(Reporter reporter) throws KettleException {
        setDebugStatus(reporter, "Preparing transformation for execution");
        this.trans.prepareExecution((String[]) null);
    }

    private void setTransLogLevel(Reporter reporter) {
        if (this.logLevel == null) {
            setDebugStatus(reporter, getClass().getName() + ".logLevel is null.  The trans log level will not be set.");
        } else {
            setDebugStatus(reporter, "Setting the trans.logLevel to " + this.logLevel.toString());
            this.trans.setLogLevel(this.logLevel);
        }
    }

    private void shareVariableSpaceWithTrans(Reporter reporter) {
        if (this.variableSpace == null) {
            setDebugStatus(reporter, "variableSpace is null.  We are not going to share it with the trans.");
            return;
        }
        setDebugStatus(reporter, "Sharing the VariableSpace from the PDI job.");
        this.trans.shareVariablesWith(this.variableSpace);
        if (this.debug) {
            List<String> asList = Arrays.asList(this.trans.listVariables());
            Collections.sort(asList);
            if (asList != null) {
                setDebugStatus(reporter, "Variables: ");
                for (String str : asList) {
                    setDebugStatus(reporter, "     " + str + " = " + this.trans.getVariable(str));
                }
            }
        }
    }

    private void addInjectorAndProducerToTrans(K k, Iterator<V> it, OutputCollector<K2, V2> outputCollector, Reporter reporter, String str, String str2) throws Exception {
        setDebugStatus(reporter, "Locating output step: " + str2);
        StepInterface findRunThread = this.trans.findRunThread(str2);
        if (findRunThread == null) {
            if (str2 != null) {
                setDebugStatus(reporter, "Output step [" + str2 + "] could not be found");
                throw new KettleException("Output step not defined in transformation");
            }
            setDebugStatus(reporter, "Output step name not specified");
            return;
        }
        this.rowCollector = new OutputCollectorRowListener<>(outputCollector, this.outClassK, this.outClassV, reporter, this.debug);
        findRunThread.addRowListener(this.rowCollector);
        this.injectorRowMeta = new RowMeta();
        setDebugStatus(reporter, "Locating input step: " + str);
        if (str != null) {
            this.rowProducer = this.trans.addRowProducer(str, 0);
            BaseStepMeta stepMetaInterface = this.rowProducer.getStepInterface().getStepMeta().getStepMetaInterface();
            this.inOrdinals = null;
            if (stepMetaInterface instanceof BaseStepMeta) {
                setDebugStatus(reporter, "Generating converters from RowMeta for injection into the transformation");
                stepMetaInterface.getFields(this.injectorRowMeta, (String) null, (RowMetaInterface[]) null, (StepMeta) null, (VariableSpace) null);
                this.inOrdinals = new InKeyValueOrdinals(this.injectorRowMeta);
                if (this.inOrdinals.getKeyOrdinal() < 0 || this.inOrdinals.getValueOrdinal() < 0) {
                    throw new KettleException("key or value is not defined in transformation injector step");
                }
                if (this.injectorRowMeta.getValueMeta(this.inOrdinals.getKeyOrdinal()) != null) {
                    this.inConverterK = this.typeConverterFactory.getConverter(k.getClass(), this.injectorRowMeta.getValueMeta(this.inOrdinals.getKeyOrdinal()));
                }
                if (it.hasNext()) {
                    this.value = it.next();
                }
                if (this.value != null && this.injectorRowMeta.getValueMeta(this.inOrdinals.getValueOrdinal()) != null) {
                    this.inConverterV = this.typeConverterFactory.getConverter(this.value.getClass(), this.injectorRowMeta.getValueMeta(this.inOrdinals.getValueOrdinal()));
                }
            }
            this.trans.startThreads();
        } else {
            setDebugStatus(reporter, "No input stepname was defined");
        }
        if (getException() != null) {
            setDebugStatus(reporter, "An exception was generated by the transformation");
            throw getException();
        }
    }

    @Override // org.pentaho.hadoop.mapreduce.PentahoMapReduceBase
    public void close() throws IOException {
        if (isSingleThreaded() && this.executor != null) {
            try {
                this.executor.dispose();
            } catch (KettleException e) {
                e.printStackTrace(System.err);
                this.trans.getLogChannel().logError("Error disposing of single threading transformation: ", e);
            }
            KettleLogStore.discardLines(this.trans.getLogChannelId(), true);
        }
        super.close();
    }
}
