package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.class */
public class MapReduceLauncher extends Launcher {
    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs";
    public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
    private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
    private Map<FileSpec, Exception> failureMap;
    private Exception jobControlException = null;
    private String jobControlExceptionStackTrace = null;
    private boolean aggregateWarning = false;
    private JobControl jc = null;

    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher$HangingJobKiller.class */
    private class HangingJobKiller extends Thread {
        public HangingJobKiller() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                MapReduceLauncher.log.debug("Receive kill signal");
                if (MapReduceLauncher.this.jc != null) {
                    Iterator it = MapReduceLauncher.this.jc.getRunningJobs().iterator();
                    while (it.hasNext()) {
                        Job job = (Job) it.next();
                        RunningJob job2 = job.getJobClient().getJob(job.getAssignedJobID());
                        if (job2 != null) {
                            job2.killJob();
                        }
                        MapReduceLauncher.log.info("Job " + job.getJobID() + " killed");
                    }
                }
            } catch (Exception e) {
                MapReduceLauncher.log.warn("Encounter exception on cleanup:" + e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher$JobControlThreadExceptionHandler.class */
    public class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
        JobControlThreadExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            MapReduceLauncher.this.jobControlExceptionStackTrace = MapReduceLauncher.this.getStackStraceStr(th);
            try {
                MapReduceLauncher.this.jobControlException = MapReduceLauncher.this.getExceptionFromString(MapReduceLauncher.this.jobControlExceptionStackTrace);
            } catch (Exception e) {
                MapReduceLauncher.this.jobControlException = new RuntimeException("Could not resolve error that occured when launching map reduce job: " + MapReduceLauncher.this.jobControlExceptionStackTrace);
            }
        }
    }

    public MapReduceLauncher() {
        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
    }

    public Exception getError(FileSpec fileSpec) {
        return this.failureMap.get(fileSpec);
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher
    public void reset() {
        this.failureMap = new HashMap();
        super.reset();
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher
    public PigStats launchPig(PhysicalPlan physicalPlan, String str, PigContext pigContext) throws PlanException, VisitorException, IOException, ExecException, JobCreationException, Exception {
        String str2;
        this.aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
        MROperPlan compile = compile(physicalPlan, pigContext);
        ConfigurationValidator.validatePigProperties(pigContext.getProperties());
        Configuration configuration = ConfigurationUtil.toConfiguration(pigContext.getProperties());
        JobClient jobClient = new JobClient(pigContext.getExecutionEngine().getJobConf());
        JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, configuration);
        PigStatsUtil.startCollection(pigContext, jobClient, jobControlCompiler, compile);
        LinkedList<Job> linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        LinkedList linkedList3 = new LinkedList();
        LinkedList<Job> linkedList4 = new LinkedList();
        int size = compile.size();
        int i = 0;
        double d = -1.0d;
        JobControlThreadExceptionHandler jobControlThreadExceptionHandler = new JobControlThreadExceptionHandler();
        boolean equals = pigContext.getProperties().getProperty("stop.on.failure", "false").equals("true");
        while (compile.size() != 0) {
            this.jc = jobControlCompiler.compile(compile, str);
            if (this.jc == null) {
                LinkedList<MapReduceOper> linkedList5 = new LinkedList();
                linkedList5.addAll(compile.getRoots());
                for (MapReduceOper mapReduceOper : linkedList5) {
                    if (mapReduceOper instanceof NativeMapReduceOper) {
                        NativeMapReduceOper nativeMapReduceOper = (NativeMapReduceOper) mapReduceOper;
                        try {
                            ScriptState.get().emitJobsSubmittedNotification(1);
                            nativeMapReduceOper.runJob();
                            i++;
                        } catch (IOException e) {
                            compile.trimBelow((MROperPlan) nativeMapReduceOper);
                            linkedList2.add(nativeMapReduceOper);
                            String str3 = "Error running native mapreduce operator job :" + nativeMapReduceOper.getJobId() + e.getMessage();
                            LogUtils.writeLog(str3, getStackStraceStr(e), pigContext.getProperties().getProperty("pig.logfile"), log);
                            log.info(str3);
                            if (equals) {
                                throw new ExecException(str3, 6017, (byte) 16);
                            }
                        }
                        double d2 = i / size;
                        notifyProgress(d2, d);
                        d = d2;
                        compile.remove(nativeMapReduceOper);
                    }
                }
            } else {
                ArrayList<Job> waitingJobs = this.jc.getWaitingJobs();
                log.info(waitingJobs.size() + " map-reduce job(s) waiting for submission.");
                ScriptState.get().emitJobsSubmittedNotification(waitingJobs.size());
                PigStatsUtil.updateJobMroMap(jobControlCompiler.getJobMroMap());
                JobConf jobConf = ((Job) waitingJobs.get(0)).getJobConf();
                try {
                    String str4 = jobConf.get("mapred.job.tracker.http.address");
                    String str5 = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
                    str2 = str5.substring(0, str5.indexOf(":")) + str4.substring(str4.indexOf(":"));
                } catch (Exception e2) {
                    str2 = null;
                    log.debug("Failed to get job tracker location.");
                }
                linkedList3.clear();
                final UDFContext uDFContext = UDFContext.getUDFContext();
                Thread thread = new Thread(this.jc) { // from class: org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        UDFContext.setUdfContext(uDFContext.m200clone());
                        super.run();
                    }
                };
                thread.setUncaughtExceptionHandler(jobControlThreadExceptionHandler);
                thread.setContextClassLoader(PigContext.getClassLoader());
                thread.start();
                while (!this.jc.allFinished()) {
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e3) {
                    }
                    ArrayList arrayList = new ArrayList();
                    for (Job job : waitingJobs) {
                        if (job.getAssignedJobID() != null) {
                            arrayList.add(job);
                            log.info("HadoopJobId: " + job.getAssignedJobID());
                            if (str2 != null) {
                                log.info("More information at: http://" + str2 + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
                            }
                            ScriptState.get().emitJobStartedNotification(job.getAssignedJobID().toString());
                        }
                    }
                    waitingJobs.removeAll(arrayList);
                    double calculateProgress = (i + calculateProgress(this.jc, jobClient)) / size;
                    notifyProgress(calculateProgress, d);
                    d = calculateProgress;
                    PigStatsUtil.accumulateStats(this.jc);
                }
                if (this.jobControlException != null) {
                    if (!(this.jobControlException instanceof PigException)) {
                        throw new ExecException("Unexpected error when launching map reduce job.", 2117, (byte) 4, this.jobControlException);
                    }
                    if (this.jobControlExceptionStackTrace != null) {
                        LogUtils.writeLog("Error message from job controller", this.jobControlExceptionStackTrace, pigContext.getProperties().getProperty("pig.logfile"), log);
                    }
                    throw this.jobControlException;
                }
                if (!this.jc.getFailedJobs().isEmpty()) {
                    if (equals) {
                        StringBuilder sb = new StringBuilder();
                        for (int i2 = 0; i2 < this.jc.getFailedJobs().size(); i2++) {
                            sb.append(((Job) this.jc.getFailedJobs().get(i2)).getMessage());
                            if (i2 != this.jc.getFailedJobs().size() - 1) {
                                sb.append("\n");
                            }
                        }
                        throw new ExecException(sb.toString(), 6017, (byte) 16);
                    }
                    Iterator it = this.jc.getFailedJobs().iterator();
                    while (it.hasNext()) {
                        Job job2 = (Job) it.next();
                        linkedList3.add(job2);
                        log.info("job " + job2.getAssignedJobID() + " has failed! Stop running all dependent jobs");
                    }
                    linkedList.addAll(this.jc.getFailedJobs());
                }
                i += jobControlCompiler.updateMROpPlan(linkedList3);
                ArrayList successfulJobs = this.jc.getSuccessfulJobs();
                jobControlCompiler.moveResults(successfulJobs);
                linkedList4.addAll(successfulJobs);
                PigStatsUtil.accumulateStats(this.jc);
                this.jc.stop();
            }
        }
        ScriptState.get().emitProgressUpdatedNotification(100);
        log.info("100% complete");
        boolean z = linkedList2.size() > 0;
        if (linkedList != null && linkedList.size() > 0) {
            Exception exc = null;
            for (Job job3 : linkedList) {
                try {
                    getStats(job3, jobClient, true, pigContext);
                } catch (Exception e4) {
                    exc = e4;
                }
                Iterator<POStore> it2 = jobControlCompiler.getStores(job3).iterator();
                while (it2.hasNext()) {
                    this.failureMap.put(it2.next().getSFile(), exc);
                }
                PigStatsUtil.setBackendException(job3, exc);
            }
            z = true;
        }
        PigStatsUtil.stopCollection(true);
        boolean z2 = z || !PigStats.get().isSuccessful();
        Map<Enum, Long> hashMap = new HashMap<>();
        if (linkedList4 != null) {
            for (Job job4 : linkedList4) {
                for (POStore pOStore : jobControlCompiler.getStores(job4)) {
                    if (pigContext.getExecType() == ExecType.LOCAL) {
                        HadoopShims.storeSchemaForLocal(job4, pOStore);
                    }
                    if (pOStore.isTmpStore()) {
                        log.debug("Successfully stored result in: \"" + pOStore.getSFile().getFileName() + "\"");
                    } else {
                        createSuccessFile(job4, pOStore);
                    }
                }
                getStats(job4, jobClient, false, pigContext);
                if (this.aggregateWarning) {
                    computeWarningAggregate(job4, jobClient, hashMap);
                }
            }
        }
        if (this.aggregateWarning) {
            CompilationMessageCollector.logAggregate(hashMap, CompilationMessageCollector.MessageType.Warning, log);
        }
        if (!z2) {
            log.info("Success!");
        } else if (linkedList4 == null || linkedList4.size() <= 0) {
            log.info("Failed!");
        } else {
            log.info("Some jobs have failed! Stop running all dependent jobs");
        }
        jobControlCompiler.reset();
        return PigStatsUtil.getPigStats(z2 ? (linkedList4 == null || linkedList4.size() <= 0) ? 2 : 3 : 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getStackStraceStr(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        th.printStackTrace(new PrintStream(byteArrayOutputStream));
        return byteArrayOutputStream.toString();
    }

    private void notifyProgress(double d, double d2) {
        int i;
        if (d < d2 + 0.01d || (i = (int) (d * 100.0d)) == 100) {
            return;
        }
        log.info(i + "% complete");
        ScriptState.get().emitProgressUpdatedNotification(i);
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher
    public void explain(PhysicalPlan physicalPlan, PigContext pigContext, PrintStream printStream, String str, boolean z) throws PlanException, VisitorException, IOException {
        log.trace("Entering MapReduceLauncher.explain");
        MROperPlan compile = compile(physicalPlan, pigContext);
        if (str.equals("text")) {
            MRPrinter mRPrinter = new MRPrinter(printStream, compile);
            mRPrinter.setVerbose(z);
            mRPrinter.visit();
            return;
        }
        printStream.println("#--------------------------------------------------");
        printStream.println("# Map Reduce Plan                                  ");
        printStream.println("#--------------------------------------------------");
        DotMRPrinter dotMRPrinter = new DotMRPrinter(compile, printStream);
        dotMRPrinter.setVerbose(z);
        dotMRPrinter.dump();
        printStream.println("");
    }

    public MROperPlan compile(PhysicalPlan physicalPlan, PigContext pigContext) throws PlanException, IOException, VisitorException {
        MRCompiler mRCompiler = new MRCompiler(physicalPlan, pigContext);
        mRCompiler.randomizeFileLocalizer();
        mRCompiler.compile();
        mRCompiler.aggregateScalarsFiles();
        MROperPlan mRPlan = mRCompiler.getMRPlan();
        mRCompiler.getMessageCollector().logMessages(CompilationMessageCollector.MessageType.Warning, this.aggregateWarning, log);
        String property = pigContext.getProperties().getProperty("last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
        String property2 = pigContext.getProperties().getProperty("pig.exec.nocombiner");
        if (!pigContext.inIllustrator && !"true".equals(property2)) {
            CombinerOptimizer combinerOptimizer = new CombinerOptimizer(mRPlan, Boolean.valueOf(pigContext.getProperties().getProperty(PROP_EXEC_MAP_PARTAGG, "false")).booleanValue());
            combinerOptimizer.visit();
            combinerOptimizer.getMessageCollector().logMessages(CompilationMessageCollector.MessageType.Warning, this.aggregateWarning, log);
        }
        new SampleOptimizer(mRPlan, pigContext).visit();
        LimitAdjuster limitAdjuster = new LimitAdjuster(mRPlan, pigContext);
        limitAdjuster.visit();
        limitAdjuster.adjust();
        String property3 = pigContext.getProperties().getProperty("pig.exec.nosecondarykey");
        if (!pigContext.inIllustrator && !"true".equals(property3)) {
            new SecondaryKeyOptimizer(mRPlan).visit();
        }
        new POPackageAnnotator(mRPlan).visit();
        new MRCompiler.LastInputStreamingOptimizer(mRPlan, property).visit();
        new KeyTypeDiscoveryVisitor(mRPlan).visit();
        new NoopFilterRemover(mRPlan).visit();
        if ("true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery", "true"))) {
            new MultiQueryOptimizer(mRPlan, pigContext.inIllustrator).visit();
        }
        new NoopStoreRemover(mRPlan).visit();
        new EndOfAllInputSetter(mRPlan).visit();
        if ("true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.accumulator", "true"))) {
            new AccumulatorOptimizer(mRPlan).visit();
        }
        return mRPlan;
    }

    private boolean shouldMarkOutputDir(Job job) {
        return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false);
    }

    private void createSuccessFile(Job job, POStore pOStore) throws IOException {
        if (shouldMarkOutputDir(job)) {
            Path path = new Path(pOStore.getSFile().getFileName());
            FileSystem fileSystem = path.getFileSystem(job.getJobConf());
            if (fileSystem.exists(path)) {
                Path path2 = new Path(path, SUCCEEDED_FILE_NAME);
                if (fileSystem.exists(path2)) {
                    return;
                }
                fileSystem.create(path2).close();
            }
        }
    }

    void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> map) {
        try {
            RunningJob job2 = jobClient.getJob(job.getAssignedJobID());
            if (job2 != null) {
                Counters counters = job2.getCounters();
                if (counters == null) {
                    map.put(PigWarning.NULL_COUNTER_COUNT, Long.valueOf((map.get(PigWarning.NULL_COUNTER_COUNT) == null ? 0L : map.get(PigWarning.NULL_COUNTER_COUNT).longValue()) + 1));
                }
                for (PigWarning pigWarning : PigWarning.values()) {
                    if (pigWarning != PigWarning.NULL_COUNTER_COUNT) {
                        Long l = map.get(pigWarning);
                        Long valueOf = Long.valueOf(l == null ? 0L : l.longValue());
                        if (counters != null) {
                            valueOf = Long.valueOf(valueOf.longValue() + counters.getCounter(pigWarning));
                        }
                        map.put(pigWarning, valueOf);
                    }
                }
            }
        } catch (IOException e) {
            log.warn("Unable to retrieve job to compute warning aggregation.");
        }
    }
}
