/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBytesRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigDoubleRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFloatRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigIntRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigLongRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.io.NullableLongWritable;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.ScriptState;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class JobControlCompiler {
    MROperPlan plan;
    Configuration conf;
    PigContext pigContext;
    private static final Log log = LogFactory.getLog(JobControlCompiler.class);
    public static final String LOG_DIR = "_logs";
    public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
    public static final String PIG_MAP_STORES = "pig.map.stores";
    public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
    private Map<org.apache.hadoop.mapred.jobcontrol.Job, Pair<List<POStore>, Path>> jobStoreMap;
    private Map<org.apache.hadoop.mapred.jobcontrol.Job, MapReduceOper> jobMroMap;

    public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
        this.pigContext = pigContext;
        this.conf = conf;
        this.jobStoreMap = new HashMap<org.apache.hadoop.mapred.jobcontrol.Job, Pair<List<POStore>, Path>>();
        this.jobMroMap = new HashMap<org.apache.hadoop.mapred.jobcontrol.Job, MapReduceOper>();
    }

    public List<POStore> getStores(org.apache.hadoop.mapred.jobcontrol.Job job) {
        Pair<List<POStore>, Path> pair = this.jobStoreMap.get(job);
        if (pair != null && pair.first != null) {
            return (List)pair.first;
        }
        return new ArrayList<POStore>();
    }

    public void reset() {
        this.jobStoreMap = new HashMap<org.apache.hadoop.mapred.jobcontrol.Job, Pair<List<POStore>, Path>>();
        this.jobMroMap = new HashMap<org.apache.hadoop.mapred.jobcontrol.Job, MapReduceOper>();
        UDFContext.getUDFContext().reset();
    }

    Map<org.apache.hadoop.mapred.jobcontrol.Job, MapReduceOper> getJobMroMap() {
        return Collections.unmodifiableMap(this.jobMroMap);
    }

    public void moveResults(List<org.apache.hadoop.mapred.jobcontrol.Job> completedJobs) throws IOException {
        for (org.apache.hadoop.mapred.jobcontrol.Job job : completedJobs) {
            Pair<List<POStore>, Path> pair = this.jobStoreMap.get(job);
            if (pair == null || pair.second == null) continue;
            Path tmp = (Path)pair.second;
            Path abs = new Path(tmp, "abs");
            Path rel = new Path(tmp, "rel");
            FileSystem fs = tmp.getFileSystem(this.conf);
            if (fs.exists(abs)) {
                this.moveResults(abs, abs.toUri().getPath(), fs);
            }
            if (!fs.exists(rel)) continue;
            this.moveResults(rel, rel.toUri().getPath() + "/", fs);
        }
    }

    private void moveResults(Path p, String rem, FileSystem fs) throws IOException {
        for (FileStatus fstat : fs.listStatus(p)) {
            Path src = fstat.getPath();
            if (fstat.isDir()) {
                log.info((Object)("mkdir: " + src));
                fs.mkdirs(this.removePart(src, rem));
                this.moveResults(fstat.getPath(), rem, fs);
                continue;
            }
            Path dst = this.removePart(src, rem);
            log.info((Object)("mv: " + src + " " + dst));
            fs.rename(src, dst);
        }
    }

    private Path removePart(Path src, String part) {
        URI uri = src.toUri();
        String pathStr = uri.getPath().replace(part, "");
        return new Path(pathStr);
    }

    public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException {
        this.plan = plan;
        JobControl jobCtrl = new JobControl(grpName);
        try {
            LinkedList roots = new LinkedList();
            roots.addAll(plan.getRoots());
            for (MapReduceOper mro : roots) {
                if (mro instanceof NativeMapReduceOper) {
                    return null;
                }
                org.apache.hadoop.mapred.jobcontrol.Job job = this.getJob(mro, this.conf, this.pigContext);
                this.jobMroMap.put(job, mro);
                jobCtrl.addJob(job);
            }
        }
        catch (JobCreationException jce) {
            throw jce;
        }
        catch (Exception e) {
            int errCode = 2017;
            String msg = "Internal error creating job configuration.";
            throw new JobCreationException(msg, errCode, 4, e);
        }
        return jobCtrl;
    }

    public int updateMROpPlan(List<org.apache.hadoop.mapred.jobcontrol.Job> completeFailedJobs) {
        int sizeBefore = this.plan.size();
        for (org.apache.hadoop.mapred.jobcontrol.Job job : completeFailedJobs) {
            MapReduceOper mrOper = this.jobMroMap.get(job);
            this.plan.trimBelow(mrOper);
            this.plan.remove(mrOper);
        }
        for (org.apache.hadoop.mapred.jobcontrol.Job job : this.jobMroMap.keySet()) {
            if (completeFailedJobs.contains(job)) continue;
            MapReduceOper mro = this.jobMroMap.get(job);
            this.plan.remove(mro);
        }
        this.jobMroMap.clear();
        int sizeAfter = this.plan.size();
        return sizeBefore - sizeAfter;
    }

    /*
     * WARNING - void declaration
     */
    private org.apache.hadoop.mapred.jobcontrol.Job getJob(MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException {
        Job nwJob = null;
        try {
            nwJob = new Job(config);
        }
        catch (Exception e) {
            throw new JobCreationException(e);
        }
        Configuration conf = nwJob.getConfiguration();
        ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
        ArrayList inpTargets = new ArrayList();
        ArrayList<String> inpSignatureLists = new ArrayList<String>();
        ArrayList<POStore> storeLocations = new ArrayList<POStore>();
        Path tmpLocation = null;
        String setScriptProp = conf.get("pig.script.info.enabled", "true");
        if (setScriptProp.equalsIgnoreCase("true")) {
            ScriptState ss = ScriptState.get();
            ss.addSettingsToConf(mro, conf);
        }
        conf.set("mapred.mapper.new-api", "true");
        conf.set("mapred.reducer.new-api", "true");
        String buffPercent = conf.get("mapred.job.reduce.markreset.buffer.percent");
        if (buffPercent == null || Double.parseDouble(buffPercent) <= 0.0) {
            log.info((Object)"mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3");
            conf.set("mapred.job.reduce.markreset.buffer.percent", "0.3");
        } else {
            log.info((Object)("mapred.job.reduce.markreset.buffer.percent is set to " + conf.get("mapred.job.reduce.markreset.buffer.percent")));
        }
        try {
            LinkedList<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
            if (lds != null && lds.size() > 0) {
                for (POLoad ld : lds) {
                    LoadFunc lf = ld.getLoadFunc();
                    if (lf != null) {
                        lf.setLocation(ld.getLFile().getFileName(), nwJob);
                    }
                    inp.add(ld.getLFile());
                    List<POLoad> ldSucs = mro.mapPlan.getSuccessors(ld);
                    ArrayList<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
                    if (ldSucs != null) {
                        for (PhysicalOperator physicalOperator : ldSucs) {
                            ldSucKeys.add(physicalOperator.getOperatorKey());
                        }
                    }
                    inpTargets.add(ldSucKeys);
                    inpSignatureLists.add(ld.getSignature());
                    mro.mapPlan.remove(ld);
                }
            }
            File submitJarFile = File.createTempFile("Job", ".jar");
            submitJarFile.deleteOnExit();
            FileOutputStream fos = new FileOutputStream(submitJarFile);
            JarManager.createJar(fos, mro.UDFs, pigContext);
            conf.set("mapred.jar", submitJarFile.getPath());
            conf.set("pig.inputs", ObjectSerializer.serialize(inp));
            conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
            conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
            conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
            conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
            if (System.getProperty("mapred.job.name") == null && pigContext.getProperties().getProperty("jobName") != null) {
                nwJob.setJobName(pigContext.getProperties().getProperty("jobName"));
            }
            if (pigContext.getProperties().getProperty("jobPriority") != null) {
                String jobPriority = pigContext.getProperties().getProperty("jobPriority").toUpperCase();
                try {
                    conf.set("mapred.job.priority", JobPriority.valueOf((String)jobPriority).toString());
                }
                catch (IllegalArgumentException e) {
                    void var20_31;
                    StringBuffer sb = new StringBuffer("The job priority must be one of [");
                    JobPriority[] priorities = JobPriority.values();
                    boolean bl = false;
                    while (var20_31 < priorities.length) {
                        if (var20_31 > 0) {
                            sb.append(", ");
                        }
                        sb.append(priorities[var20_31]);
                        ++var20_31;
                    }
                    sb.append("].  You specified [" + jobPriority + "]");
                    throw new JobCreationException(sb.toString());
                }
            }
            JobControlCompiler.setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(), "pig.streaming.ship.files", true);
            JobControlCompiler.setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(), "pig.streaming.cache.files", false);
            nwJob.setInputFormatClass(PigInputFormat.class);
            LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
            LinkedList<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
            for (POStore st : mapStores) {
                storeLocations.add(st);
                StoreFuncInterface storeFuncInterface = st.getStoreFunc();
                storeFuncInterface.setStoreLocation(st.getSFile().getFileName(), nwJob);
            }
            for (POStore st : reduceStores) {
                storeLocations.add(st);
                StoreFuncInterface storeFuncInterface = st.getStoreFunc();
                storeFuncInterface.setStoreLocation(st.getSFile().getFileName(), nwJob);
            }
            nwJob.setOutputFormatClass(PigOutputFormat.class);
            if (mapStores.size() + reduceStores.size() == 1) {
                POStore st;
                log.info((Object)"Setting up single store job");
                if (reduceStores.isEmpty()) {
                    st = mapStores.get(0);
                    mro.mapPlan.remove(st);
                } else {
                    st = reduceStores.get(0);
                    mro.reducePlan.remove(st);
                }
                String outputPathString = st.getSFile().getFileName();
                if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
                    conf.set("pig.streaming.log.dir", new Path(outputPathString, LOG_DIR).toString());
                } else {
                    String string = FileLocalizer.getTemporaryPath(pigContext).toString();
                    tmpLocation = new Path(string);
                    conf.set("pig.streaming.log.dir", new Path(tmpLocation, LOG_DIR).toString());
                }
                conf.set("pig.streaming.task.output.dir", outputPathString);
            } else {
                log.info((Object)"Setting up multi store job");
                String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
                tmpLocation = new Path(tmpLocationStr);
                nwJob.setOutputFormatClass(PigOutputFormat.class);
                for (POStore pOStore : storeLocations) {
                    pOStore.setMultiStore(true);
                }
                conf.set("pig.streaming.log.dir", new Path(tmpLocation, LOG_DIR).toString());
                conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
            }
            conf.set("pig.map.keytype", ObjectSerializer.serialize((Serializable)new byte[]{mro.mapKeyType}));
            new PhyPlanSetter(mro.mapPlan).visit();
            new PhyPlanSetter(mro.reducePlan).visit();
            this.setupDistributedCacheForJoin(mro, pigContext, conf);
            POPackage pack = null;
            if (mro.reducePlan.isEmpty()) {
                nwJob.setMapperClass(PigMapOnly.Map.class);
                nwJob.setNumReduceTasks(0);
                conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                if (mro.isEndOfAllInputSetInMap()) {
                    conf.set(END_OF_INP_IN_MAP, "true");
                }
            } else {
                if (!mro.combinePlan.isEmpty()) {
                    POPackage combPack = (POPackage)mro.combinePlan.getRoots().get(0);
                    mro.combinePlan.remove(combPack);
                    nwJob.setCombinerClass(PigCombiner.Combine.class);
                    conf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
                    conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
                } else if (mro.needsDistinctCombiner()) {
                    nwJob.setCombinerClass(DistinctCombiner.Combine.class);
                    log.info((Object)"Setting identity combiner class.");
                }
                pack = (POPackage)mro.reducePlan.getRoots().get(0);
                mro.reducePlan.remove(pack);
                nwJob.setMapperClass(PigMapReduce.Map.class);
                nwJob.setReducerClass(PigMapReduce.Reduce.class);
                if (mro.requestedParallelism > 0) {
                    nwJob.setNumReduceTasks(mro.requestedParallelism);
                } else if (pigContext.defaultParallel > 0) {
                    conf.set("mapred.reduce.tasks", "" + pigContext.defaultParallel);
                } else {
                    JobControlCompiler.estimateNumberOfReducers(conf, lds);
                }
                if (mro.customPartitioner != null) {
                    nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
                }
                conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                if (mro.isEndOfAllInputSetInMap()) {
                    conf.set(END_OF_INP_IN_MAP, "true");
                }
                conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
                if (mro.isEndOfAllInputSetInReduce()) {
                    conf.set("pig.stream.in.reduce", "true");
                }
                conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType()));
                if (mro.getUseSecondaryKey()) {
                    nwJob.setGroupingComparatorClass(PigSecondaryKeyGroupComparator.class);
                    nwJob.setPartitionerClass(SecondaryKeyPartitioner.class);
                    nwJob.setSortComparatorClass(PigSecondaryKeyComparator.class);
                    nwJob.setOutputKeyClass(NullableTuple.class);
                    conf.set("pig.secondarySortOrder", ObjectSerializer.serialize((Serializable)mro.getSecondarySortOrder()));
                } else {
                    Class<?> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
                    nwJob.setOutputKeyClass(keyClass);
                    this.selectComparator(mro, pack.getKeyType(), nwJob);
                }
                nwJob.setOutputValueClass(NullableTuple.class);
            }
            if (mro.isGlobalSort() || mro.isLimitAfterSort()) {
                if (mro.isGlobalSort()) {
                    String symlink = JobControlCompiler.addSingleFileToDistributedCache(pigContext, conf, mro.getQuantFile(), "pigsample");
                    conf.set("pig.quantilesFile", symlink);
                    nwJob.setPartitionerClass(WeightedRangePartitioner.class);
                }
                if (mro.isUDFComparatorUsed) {
                    boolean usercomparator = false;
                    for (String compFuncSpec : mro.UDFs) {
                        Class comparator = PigContext.resolveClassName(compFuncSpec);
                        if (!ComparisonFunc.class.isAssignableFrom(comparator)) continue;
                        nwJob.setMapperClass(PigMapReduce.MapWithComparator.class);
                        nwJob.setReducerClass(PigMapReduce.ReduceWithComparator.class);
                        conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                        conf.set("pig.usercomparator", "true");
                        nwJob.setOutputKeyClass(NullableTuple.class);
                        nwJob.setSortComparatorClass(comparator);
                        usercomparator = true;
                        break;
                    }
                    if (!usercomparator) {
                        String string = "Internal error. Can't find the UDF comparator";
                        throw new IOException(string);
                    }
                } else {
                    conf.set("pig.sortOrder", ObjectSerializer.serialize((Serializable)mro.getSortOrder()));
                }
            }
            if (mro.isSkewedJoin()) {
                String symlink = JobControlCompiler.addSingleFileToDistributedCache(pigContext, conf, mro.getSkewedJoinPartitionFile(), "pigdistkey");
                conf.set("pig.keyDistFile", symlink);
                nwJob.setPartitionerClass(SkewedPartitioner.class);
                nwJob.setMapperClass(PigMapReduce.MapWithPartitionIndex.class);
                nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
                nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
            }
            for (POStore pOStore : mapStores) {
                pOStore.setInputs(null);
                pOStore.setParentPlan(null);
            }
            for (POStore pOStore : reduceStores) {
                pOStore.setInputs(null);
                pOStore.setParentPlan(null);
            }
            if (Utils.tmpFileCompression(pigContext)) {
                conf.setBoolean("pig.tmpfilecompression", true);
                conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
            }
            conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
            conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
            long l = 0L;
            if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false")) {
                conf.setBoolean("pig.noSplitCombination", true);
            } else {
                String tmp = pigContext.getProperties().getProperty("pig.maxCombinedSplitSize", null);
                if (tmp != null) {
                    try {
                        l = Long.parseLong(tmp);
                    }
                    catch (NumberFormatException e) {
                        log.warn((Object)"Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
                    }
                }
            }
            if (l > 0L) {
                conf.setLong("pig.maxCombinedSplitSize", l);
            }
            UDFContext.getUDFContext().serialize(conf);
            org.apache.hadoop.mapred.jobcontrol.Job cjob = new org.apache.hadoop.mapred.jobcontrol.Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
            this.jobStoreMap.put(cjob, new Pair(storeLocations, tmpLocation));
            return cjob;
        }
        catch (JobCreationException jce) {
            throw jce;
        }
        catch (Exception e) {
            int errCode = 2017;
            String msg = "Internal error creating job configuration.";
            throw new JobCreationException(msg, errCode, 4, e);
        }
    }

    static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException {
        long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", 1000000000L);
        int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
        long totalInputFileSize = JobControlCompiler.getTotalInputFileSize(conf, lds);
        log.info((Object)("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " totalInputFileSize=" + totalInputFileSize));
        int reducers = (int)Math.ceil(((double)totalInputFileSize + 0.0) / (double)bytesPerReducer);
        reducers = Math.max(1, reducers);
        reducers = Math.min(maxReducers, reducers);
        conf.setInt("mapred.reduce.tasks", reducers);
        log.info((Object)("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers));
        return reducers;
    }

    private static long getTotalInputFileSize(Configuration conf, List<POLoad> lds) throws IOException {
        ArrayList<String> inputs = new ArrayList<String>();
        if (lds != null && lds.size() > 0) {
            for (POLoad ld : lds) {
                inputs.add(ld.getLFile().getFileName());
            }
        }
        long size = 0L;
        FileSystem fs = FileSystem.get((Configuration)conf);
        for (String input : inputs) {
            if (!UriUtil.isHDFSFileOrLocal(input)) continue;
            for (String location : LoadFunc.getPathStrings(input)) {
                FileStatus[] status;
                if (!UriUtil.isHDFSFileOrLocal(location) || (status = fs.globStatus(new Path(location))) == null) continue;
                for (FileStatus s : status) {
                    size += JobControlCompiler.getPathLength(fs, s);
                }
            }
        }
        return size;
    }

    private static long getPathLength(FileSystem fs, FileStatus status) throws IOException {
        if (!status.isDir()) {
            return status.getLen();
        }
        FileStatus[] children = fs.listStatus(status.getPath());
        long size = 0L;
        for (FileStatus child : children) {
            size += JobControlCompiler.getPathLength(fs, child);
        }
        return size;
    }

    private void selectComparator(MapReduceOper mro, byte keyType, Job job) throws JobCreationException {
        String msg;
        boolean hasOrderBy = false;
        if (mro.isGlobalSort() || mro.isLimitAfterSort() || mro.usingTypedComparator()) {
            hasOrderBy = true;
        } else {
            MapReduceOper succ;
            List<MapReduceOper> succs = this.plan.getSuccessors(mro);
            if (succs != null && (succ = succs.get(0)).isGlobalSort()) {
                hasOrderBy = true;
            }
        }
        if (hasOrderBy) {
            switch (keyType) {
                case 10: {
                    job.setSortComparatorClass(PigIntRawComparator.class);
                    break;
                }
                case 15: {
                    job.setSortComparatorClass(PigLongRawComparator.class);
                    break;
                }
                case 20: {
                    job.setSortComparatorClass(PigFloatRawComparator.class);
                    break;
                }
                case 25: {
                    job.setSortComparatorClass(PigDoubleRawComparator.class);
                    break;
                }
                case 55: {
                    job.setSortComparatorClass(PigTextRawComparator.class);
                    break;
                }
                case 50: {
                    job.setSortComparatorClass(PigBytesRawComparator.class);
                    break;
                }
                case 100: {
                    int errCode = 1068;
                    msg = "Using Map as key not supported.";
                    throw new JobCreationException(msg, errCode, 2);
                }
                case 110: {
                    job.setSortComparatorClass(PigTupleSortComparator.class);
                    break;
                }
                case 120: {
                    int errCode = 1068;
                    msg = "Using Bag as key not supported.";
                    throw new JobCreationException(msg, errCode, 2);
                }
            }
            return;
        }
        switch (keyType) {
            case 10: {
                job.setSortComparatorClass(PigIntWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingIntWritableComparator.class);
                break;
            }
            case 15: {
                job.setSortComparatorClass(PigLongWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingLongWritableComparator.class);
                break;
            }
            case 20: {
                job.setSortComparatorClass(PigFloatWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingFloatWritableComparator.class);
                break;
            }
            case 25: {
                job.setSortComparatorClass(PigDoubleWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingDoubleWritableComparator.class);
                break;
            }
            case 55: {
                job.setSortComparatorClass(PigCharArrayWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingCharArrayWritableComparator.class);
                break;
            }
            case 50: {
                job.setSortComparatorClass(PigDBAWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingDBAWritableComparator.class);
                break;
            }
            case 100: {
                int errCode = 1068;
                msg = "Using Map as key not supported.";
                throw new JobCreationException(msg, errCode, 2);
            }
            case 110: {
                job.setSortComparatorClass(PigTupleWritableComparator.class);
                job.setGroupingComparatorClass(PigGroupingTupleWritableComparator.class);
                break;
            }
            case 120: {
                int errCode = 1068;
                msg = "Using Bag as key not supported.";
                throw new JobCreationException(msg, errCode, 2);
            }
            default: {
                int errCode = 2036;
                msg = "Unhandled key type " + DataType.findTypeName(keyType);
                throw new JobCreationException(msg, errCode, 4);
            }
        }
    }

    private void setupDistributedCacheForJoin(MapReduceOper mro, PigContext pigContext, Configuration conf) throws IOException {
        new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
        new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
    }

    private static void setupDistributedCache(PigContext pigContext, Configuration conf, Properties properties, String key, boolean shipToCluster) throws IOException {
        String fileNames = properties.getProperty(key);
        if (fileNames != null) {
            String[] paths = fileNames.split(",");
            JobControlCompiler.setupDistributedCache(pigContext, conf, paths, shipToCluster);
        }
    }

    private static void setupDistributedCache(PigContext pigContext, Configuration conf, String[] paths, boolean shipToCluster) throws IOException {
        DistributedCache.createSymlink((Configuration)conf);
        for (String path : paths) {
            if ((path = path.trim()).length() == 0) continue;
            Path src = new Path(path);
            URI srcURI = null;
            try {
                srcURI = new URI(src.toString());
            }
            catch (URISyntaxException ue) {
                int errCode = 6003;
                String msg = "Invalid cache specification. File doesn't exist: " + src;
                throw new ExecException(msg, errCode, 8);
            }
            if (shipToCluster) {
                Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
                FileSystem fs = dst.getFileSystem(conf);
                fs.copyFromLocalFile(src, dst);
                URI dstURI = null;
                try {
                    dstURI = new URI(dst.toString() + "#" + src.getName());
                }
                catch (URISyntaxException ue) {
                    byte errSrc = pigContext.getErrorSource();
                    int errCode = 0;
                    switch (errSrc) {
                        case 16: {
                            errCode = 6004;
                            break;
                        }
                        case 8: {
                            errCode = 4004;
                            break;
                        }
                        default: {
                            errCode = 2037;
                        }
                    }
                    String msg = "Invalid ship specification. File doesn't exist: " + dst;
                    throw new ExecException(msg, errCode, errSrc);
                }
                DistributedCache.addCacheFile((URI)dstURI, (Configuration)conf);
                continue;
            }
            DistributedCache.addCacheFile((URI)srcURI, (Configuration)conf);
        }
    }

    private static String addSingleFileToDistributedCache(PigContext pigContext, Configuration conf, String filename, String prefix) throws IOException {
        if (!FileLocalizer.fileExists(filename, pigContext)) {
            throw new IOException("Internal error: skew join partition file " + filename + " does not exist");
        }
        String symlink = filename;
        if (pigContext.getExecType() != ExecType.LOCAL) {
            symlink = prefix + "_" + Integer.toString(System.identityHashCode(filename)) + "_" + Long.toString(System.currentTimeMillis());
            filename = filename + "#" + symlink;
            JobControlCompiler.setupDistributedCache(pigContext, conf, new String[]{filename}, false);
        }
        return symlink;
    }

    private static class JoinDistributedCacheVisitor
    extends PhyPlanVisitor {
        private PigContext pigContext = null;
        private Configuration conf = null;

        public JoinDistributedCacheVisitor(PhysicalPlan plan, PigContext pigContext, Configuration conf) {
            super(plan, (PlanWalker<PhysicalOperator, PhysicalPlan>)new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
            this.pigContext = pigContext;
            this.conf = conf;
        }

        public void visitFRJoin(POFRJoin join) throws VisitorException {
            if (this.pigContext.getExecType() == ExecType.LOCAL) {
                return;
            }
            FileSpec[] replFiles = join.getReplFiles();
            ArrayList<String> replicatedPath = new ArrayList<String>();
            FileSpec[] newReplFiles = new FileSpec[replFiles.length];
            for (int i = 0; i < replFiles.length; ++i) {
                String symlink = "";
                if (i != join.getFragment()) {
                    symlink = "pigrepl_" + join.getOperatorKey().toString() + "_" + Integer.toString(System.identityHashCode(replFiles[i].getFileName())) + "_" + Long.toString(System.currentTimeMillis()) + "_" + i;
                    replicatedPath.add(replFiles[i].getFileName() + "#" + symlink);
                }
                newReplFiles[i] = new FileSpec(symlink, replFiles[i] == null ? null : replFiles[i].getFuncSpec());
            }
            join.setReplFiles(newReplFiles);
            try {
                JobControlCompiler.setupDistributedCache(this.pigContext, this.conf, replicatedPath.toArray(new String[0]), false);
            }
            catch (IOException e) {
                String msg = "Internal error. Distributed cache could not be set up for the replicated files";
                throw new VisitorException(msg, e);
            }
        }

        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
            if (this.pigContext.getExecType() == ExecType.LOCAL) {
                return;
            }
            String indexFile = join.getIndexFile();
            if (indexFile == null) {
                return;
            }
            try {
                String symlink = JobControlCompiler.addSingleFileToDistributedCache(this.pigContext, this.conf, indexFile, "indexfile_");
                join.setIndexFile(symlink);
            }
            catch (IOException e) {
                String msg = "Internal error. Distributed cache could not be set up for merge join index file";
                throw new VisitorException(msg, e);
            }
        }

        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException {
            if (this.pigContext.getExecType() == ExecType.LOCAL) {
                return;
            }
            String indexFile = mergeCoGrp.getIndexFileName();
            if (indexFile == null) {
                throw new VisitorException("No index file");
            }
            try {
                String symlink = JobControlCompiler.addSingleFileToDistributedCache(this.pigContext, this.conf, indexFile, "indexfile_mergecogrp_");
                mergeCoGrp.setIndexFileName(symlink);
            }
            catch (IOException e) {
                String msg = "Internal error. Distributed cache could not be set up for merge cogrp index file";
                throw new VisitorException(msg, e);
            }
        }
    }

    public static class PigGroupingBagWritableComparator
    extends WritableComparator {
        public PigGroupingBagWritableComparator() {
            super(BagFactory.getInstance().newDefaultBag().getClass(), true);
        }
    }

    public static class PigGroupingPartitionWritableComparator
    extends WritableComparator {
        public PigGroupingPartitionWritableComparator() {
            super(NullablePartitionWritable.class, true);
        }
    }

    public static class PigGroupingTupleWritableComparator
    extends WritableComparator {
        public PigGroupingTupleWritableComparator() {
            super(NullableTuple.class, true);
        }
    }

    public static class PigGroupingDBAWritableComparator
    extends WritableComparator {
        public PigGroupingDBAWritableComparator() {
            super(NullableBytesWritable.class, true);
        }
    }

    public static class PigGroupingCharArrayWritableComparator
    extends WritableComparator {
        public PigGroupingCharArrayWritableComparator() {
            super(NullableText.class, true);
        }
    }

    public static class PigGroupingDoubleWritableComparator
    extends WritableComparator {
        public PigGroupingDoubleWritableComparator() {
            super(NullableDoubleWritable.class, true);
        }
    }

    public static class PigGroupingFloatWritableComparator
    extends WritableComparator {
        public PigGroupingFloatWritableComparator() {
            super(NullableFloatWritable.class, true);
        }
    }

    public static class PigGroupingLongWritableComparator
    extends WritableComparator {
        public PigGroupingLongWritableComparator() {
            super(NullableLongWritable.class, true);
        }
    }

    public static class PigGroupingIntWritableComparator
    extends WritableComparator {
        public PigGroupingIntWritableComparator() {
            super(NullableIntWritable.class, true);
        }
    }

    public static class PigBagWritableComparator
    extends PigWritableComparator {
        public PigBagWritableComparator() {
            super(BagFactory.getInstance().newDefaultBag().getClass());
        }
    }

    public static class PigTupleWritableComparator
    extends PigWritableComparator {
        public PigTupleWritableComparator() {
            super(TupleFactory.getInstance().tupleClass());
        }
    }

    public static class PigDBAWritableComparator
    extends PigWritableComparator {
        public PigDBAWritableComparator() {
            super(NullableBytesWritable.class);
        }
    }

    public static class PigCharArrayWritableComparator
    extends PigWritableComparator {
        public PigCharArrayWritableComparator() {
            super(NullableText.class);
        }
    }

    public static class PigDoubleWritableComparator
    extends PigWritableComparator {
        public PigDoubleWritableComparator() {
            super(NullableDoubleWritable.class);
        }
    }

    public static class PigFloatWritableComparator
    extends PigWritableComparator {
        public PigFloatWritableComparator() {
            super(NullableFloatWritable.class);
        }
    }

    public static class PigLongWritableComparator
    extends PigWritableComparator {
        public PigLongWritableComparator() {
            super(NullableLongWritable.class);
        }
    }

    public static class PigIntWritableComparator
    extends PigWritableComparator {
        public PigIntWritableComparator() {
            super(NullableIntWritable.class);
        }
    }

    public static class PigWritableComparator
    extends WritableComparator {
        protected PigWritableComparator(Class c) {
            super(c);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes((byte[])b1, (int)s1, (int)l1, (byte[])b2, (int)s2, (int)l2);
        }
    }

    public static class PigSecondaryKeyGroupComparator
    extends WritableComparator {
        public PigSecondaryKeyGroupComparator() {
            super(NullableTuple.class, true);
        }

        public int compare(WritableComparable a, WritableComparable b) {
            PigNullableWritable wa = (PigNullableWritable)a;
            PigNullableWritable wb = (PigNullableWritable)b;
            if ((wa.getIndex() & 0xFFFFFF80) != 0) {
                if ((wa.getIndex() & 0x7F) < (wb.getIndex() & 0x7F)) {
                    return -1;
                }
                if ((wa.getIndex() & 0x7F) > (wb.getIndex() & 0x7F)) {
                    return 1;
                }
            }
            Object valuea = null;
            Object valueb = null;
            try {
                valuea = ((Tuple)wa.getValueAsPigType()).get(0);
                valueb = ((Tuple)wb.getValueAsPigType()).get(0);
            }
            catch (ExecException e) {
                throw new RuntimeException("Unable to access tuple field", e);
            }
            if (!wa.isNull() && !wb.isNull()) {
                int result = DataType.compare(valuea, valueb);
                if (result == 0 && valuea instanceof Tuple && valueb instanceof Tuple) {
                    try {
                        for (int i = 0; i < ((Tuple)valuea).size(); ++i) {
                            if (((Tuple)valueb).get(i) != null) continue;
                            return (wa.getIndex() & 0x7F) - (wb.getIndex() & 0x7F);
                        }
                    }
                    catch (ExecException e) {
                        throw new RuntimeException("Unable to access tuple field", e);
                    }
                }
                return result;
            }
            if (valuea == null && valueb == null) {
                if ((wa.getIndex() & 0x7F) < (wb.getIndex() & 0x7F)) {
                    return -1;
                }
                if ((wa.getIndex() & 0x7F) > (wb.getIndex() & 0x7F)) {
                    return 1;
                }
                return 0;
            }
            if (valuea == null) {
                return -1;
            }
            return 1;
        }
    }
}

