/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine;

import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketImplFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.SortInfo;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.HJob;
import org.apache.pig.backend.hadoop.executionengine.MapRedResult;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
import org.apache.pig.newplan.logical.rules.LoadStoreFuncDupSignatureValidator;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class HExecutionEngine {
    public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
    private static final String HADOOP_SITE = "hadoop-site.xml";
    private static final String CORE_SITE = "core-site.xml";
    private final Log log = LogFactory.getLog(this.getClass());
    public static final String LOCAL = "local";
    protected PigContext pigContext;
    protected DataStorage ds;
    protected JobConf jobConf;
    protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
    protected Map<OperatorKey, MapRedResult> materializedResults;

    public HExecutionEngine(PigContext pigContext) {
        this.pigContext = pigContext;
        this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
        this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
        this.ds = null;
        this.jobConf = null;
    }

    public JobConf getJobConf() {
        return this.jobConf;
    }

    public Map<OperatorKey, MapRedResult> getMaterializedResults() {
        return this.materializedResults;
    }

    public DataStorage getDataStorage() {
        return this.ds;
    }

    public void init() throws ExecException {
        this.init(this.pigContext.getProperties());
    }

    public void init(Properties properties) throws ExecException {
        this.setSSHFactory();
        String cluster = null;
        String nameNode = null;
        Configuration configuration = null;
        JobConf jc = null;
        if (this.pigContext.getExecType() == ExecType.MAPREDUCE) {
            Configuration testConf = new Configuration();
            ClassLoader cl = testConf.getClassLoader();
            URL hadoop_site = cl.getResource(HADOOP_SITE);
            URL core_site = cl.getResource(CORE_SITE);
            if (hadoop_site == null && core_site == null) {
                throw new ExecException("Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath).If you plan to use local mode, please put -x local option in command line", 4010);
            }
            jc = new JobConf();
            jc.addResource("pig-cluster-hadoop-site.xml");
            new DistributedFileSystem();
            this.recomputeProperties(jc, properties);
        } else {
            jc = new JobConf(false);
            jc.addResource("core-default.xml");
            jc.addResource("mapred-default.xml");
            this.recomputeProperties(jc, properties);
            properties.setProperty(JOB_TRACKER_LOCATION, LOCAL);
            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
        }
        cluster = properties.getProperty(JOB_TRACKER_LOCATION);
        nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
        if (cluster != null && cluster.length() > 0) {
            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
                cluster = cluster + ":50020";
            }
            properties.setProperty(JOB_TRACKER_LOCATION, cluster);
        }
        if (nameNode != null && nameNode.length() > 0) {
            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
                nameNode = nameNode + ":8020";
            }
            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
        }
        this.log.info((Object)("Connecting to hadoop file system at: " + (nameNode == null ? LOCAL : nameNode)));
        this.ds = new HDataStorage(properties);
        configuration = ConfigurationUtil.toConfiguration(properties);
        if (cluster != null && !cluster.equalsIgnoreCase(LOCAL)) {
            this.log.info((Object)("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)));
        }
        this.jobConf = new JobConf(configuration);
    }

    public Properties getConfiguration() throws ExecException {
        return this.pigContext.getProperties();
    }

    public void updateConfiguration(Properties newConfiguration) throws ExecException {
        this.init(newConfiguration);
    }

    public void close() throws ExecException {
    }

    public Map<String, Object> getStatistics() throws ExecException {
        throw new UnsupportedOperationException();
    }

    public PhysicalPlan compile(LogicalPlan plan, Properties properties) throws FrontendException {
        if (plan == null) {
            int errCode = 2041;
            String msg = "No Plan to compile";
            throw new FrontendException(msg, errCode, 4);
        }
        try {
            if (this.getConfiguration().getProperty("pig.usenewlogicalplan", "true").equals("true")) {
                this.log.info((Object)"pig.usenewlogicalplan is set to true. New logical plan will be used.");
                LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
                visitor.visit();
                org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
                SchemaResetter schemaResetter = new SchemaResetter(newPlan);
                schemaResetter.visit();
                HashSet optimizerRules = null;
                try {
                    optimizerRules = (HashSet)ObjectSerializer.deserialize(this.pigContext.getProperties().getProperty("pig.optimizer.rules"));
                }
                catch (IOException ioe) {
                    int errCode = 2110;
                    String msg = "Unable to deserialize optimizer rules.";
                    throw new FrontendException(msg, errCode, 4, ioe);
                }
                LoadStoreFuncDupSignatureValidator loadStoreFuncDupSignatureValidator = new LoadStoreFuncDupSignatureValidator(newPlan);
                loadStoreFuncDupSignatureValidator.validate();
                LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer((OperatorPlan)newPlan, 100, optimizerRules);
                optimizer.optimize();
                SortInfoSetter sortInfoSetter = new SortInfoSetter(newPlan);
                sortInfoSetter.visit();
                if (!this.pigContext.inExplain) {
                    InputOutputFileValidator validator = new InputOutputFileValidator(newPlan, this.pigContext);
                    validator.validate();
                }
                org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor translator = new org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(newPlan);
                translator.setPigContext(this.pigContext);
                translator.visit();
                return translator.getPhysicalPlan();
            }
            LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitor(plan);
            translator.setPigContext(this.pigContext);
            translator.visit();
            return translator.getPhysicalPlan();
        }
        catch (Exception ve) {
            int errCode = 2042;
            String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false.";
            throw new FrontendException(msg, errCode, 4, ve);
        }
    }

    public List<ExecJob> execute(PhysicalPlan plan, String jobName) throws ExecException, FrontendException {
        MapReduceLauncher launcher = new MapReduceLauncher();
        ArrayList<HJob> jobs = new ArrayList<HJob>();
        HashMap<String, PhysicalOperator> leafMap = new HashMap<String, PhysicalOperator>();
        for (PhysicalOperator physOp : plan.getLeaves()) {
            FileSpec spec;
            this.log.info((Object)physOp);
            if (!(physOp instanceof POStore) || (spec = ((POStore)physOp).getSFile()) == null) continue;
            leafMap.put(spec.toString(), physOp);
        }
        try {
            PigStats stats = launcher.launchPig(plan, jobName, this.pigContext);
            for (OutputStats output : stats.getOutputStats()) {
                POStore store = output.getPOStore();
                String alias = store.getAlias();
                if (output.isSuccessful()) {
                    jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, this.pigContext, store, alias, stats));
                    continue;
                }
                HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, this.pigContext, store, alias, stats);
                j.setException(launcher.getError(store.getSFile()));
                jobs.add(j);
            }
            ArrayList<HJob> i$ = jobs;
            return i$;
        }
        catch (Exception e) {
            if (e instanceof ExecException) {
                throw (ExecException)e;
            }
            if (e instanceof FrontendException) {
                throw (FrontendException)e;
            }
            int errCode = 2043;
            String msg = "Unexpected error during execution.";
            throw new ExecException(msg, errCode, 4, e);
        }
        finally {
            launcher.reset();
        }
    }

    public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
        try {
            MapRedUtil.checkLeafIsStore(plan, this.pigContext);
            MapReduceLauncher launcher = new MapReduceLauncher();
            launcher.explain(plan, this.pigContext, stream, format, verbose);
        }
        catch (Exception ve) {
            throw new RuntimeException(ve);
        }
    }

    private void setSSHFactory() {
        Properties properties = this.pigContext.getProperties();
        String g = properties.getProperty("ssh.gateway");
        if (g == null || g.length() == 0) {
            return;
        }
        try {
            Class<?> clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
            SocketImplFactory f = (SocketImplFactory)clazz.getMethod("getFactory", new Class[0]).invoke((Object)0, new Object[0]);
            Socket.setSocketImplFactory(f);
        }
        catch (SocketException e) {
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void recomputeProperties(JobConf jobConf, Properties properties) {
        if (jobConf != null && properties != null) {
            Properties hadoopProperties = new Properties();
            for (Map.Entry entry : jobConf) {
                hadoopProperties.put(entry.getKey(), entry.getValue());
            }
            Enumeration<Object> propertiesIter = properties.keys();
            while (propertiesIter.hasMoreElements()) {
                String key = (String)propertiesIter.nextElement();
                String val = properties.getProperty(key);
                if (key.equals("user.name")) continue;
                hadoopProperties.put(key, val);
            }
            properties.clear();
            Enumeration<Object> hodPropertiesIter = hadoopProperties.keys();
            while (hodPropertiesIter.hasMoreElements()) {
                String key = (String)hodPropertiesIter.nextElement();
                String val = hadoopProperties.getProperty(key);
                properties.put(key, val);
            }
        }
    }

    public static FileSpec checkLeafIsStore(PhysicalPlan plan, PigContext pigContext) throws ExecException {
        try {
            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
            FileSpec spec = null;
            if (!(leaf instanceof POStore)) {
                String scope = leaf.getOperatorKey().getScope();
                POStore str = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                spec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
                str.setSFile(spec);
                plan.addAsLeaf(str);
            } else {
                spec = ((POStore)leaf).getSFile();
            }
            return spec;
        }
        catch (Exception e) {
            int errCode = 2045;
            String msg = "Internal error. Not able to check if the leaf node is a store operator.";
            throw new ExecException(msg, errCode, 4, e);
        }
    }

    public static class SortInfoSetter
    extends LogicalRelationalNodesVisitor {
        public SortInfoSetter(OperatorPlan plan) throws FrontendException {
            super(plan, new DependencyOrderWalker(plan));
        }

        public void visit(LOStore store) throws FrontendException {
            Operator split;
            Object value;
            Operator root;
            LOSplitOutput splitOutput;
            LogicalExpressionPlan conditionPlan;
            Operator storePred = store.getPlan().getPredecessors(store).get(0);
            if (storePred == null) {
                int errCode = 2051;
                String msg = "Did not find a predecessor for Store.";
                throw new FrontendException(msg, errCode, 4);
            }
            SortInfo sortInfo = null;
            if (storePred instanceof LOLimit) {
                storePred = store.getPlan().getPredecessors(storePred).get(0);
            } else if (storePred instanceof LOSplitOutput && (conditionPlan = (splitOutput = (LOSplitOutput)storePred).getFilterPlan()).getSinks().size() == 1 && (root = conditionPlan.getSinks().get(0)) instanceof ConstantExpression && (value = ((ConstantExpression)root).getValue()) instanceof Boolean && ((Boolean)value).booleanValue() && (split = splitOutput.getPlan().getPredecessors(splitOutput).get(0)) instanceof LOSplit) {
                storePred = store.getPlan().getPredecessors(split).get(0);
            }
            if (storePred instanceof LOSort) {
                try {
                    sortInfo = ((LOSort)storePred).getSortInfo();
                }
                catch (FrontendException e) {
                    throw new FrontendException(e);
                }
            }
            store.setSortInfo(sortInfo);
        }
    }
}

