/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionCheckCommand;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.wf.ActionCheckCommand;
import org.apache.oozie.command.wf.ActionCheckXCommand;
import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;

public class ActionCheckerService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.ActionCheckerService.";
    public static final String CONF_ACTION_CHECK_INTERVAL = "oozie.service.ActionCheckerService.action.check.interval";
    public static final String CONF_ACTION_CHECK_DELAY = "oozie.service.ActionCheckerService.action.check.delay";
    public static final String CONF_CALLABLE_BATCH_SIZE = "oozie.service.ActionCheckerService.callable.batch.size";
    protected static final String INSTRUMENTATION_GROUP = "actionchecker";
    protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
    protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
    private static boolean useXCommand = true;

    @Override
    public void init(Services services) {
        Configuration conf = services.getConf();
        ActionCheckRunnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
        services.get(SchedulerService.class).schedule(actionCheckRunnable, 10L, (long)conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
        if (!Services.get().getConf().getBoolean("oozie.useXCommand", true)) {
            useXCommand = false;
        }
    }

    @Override
    public void destroy() {
    }

    @Override
    public Class<? extends Service> getInterface() {
        return ActionCheckerService.class;
    }

    static class ActionCheckRunnable
    implements Runnable {
        private int actionCheckDelay;
        private List<XCallable<Void>> callables;
        private StringBuilder msg = null;

        public ActionCheckRunnable(int actionCheckDelay) {
            this.actionCheckDelay = actionCheckDelay;
        }

        @Override
        public void run() {
            XLog.Info.get().clear();
            XLog LOG = XLog.getLog(this.getClass());
            this.msg = new StringBuilder();
            try {
                this.runWFActionCheck();
                this.runCoordActionCheck();
            }
            catch (CommandException ce) {
                LOG.error((Object)"Unable to run action checks, ", ce);
            }
            LOG.debug("QUEUING [{0}] for potential checking", this.msg.toString());
            if (null != this.callables) {
                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                if (!ret) {
                    LOG.warn("Unable to queue the callables commands for CheckerService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.callables = null;
            }
        }

        private void runWFActionCheck() throws CommandException {
            List<WorkflowActionBean> actions;
            JPAService jpaService = Services.get().get(JPAService.class);
            if (jpaService == null) {
                throw new CommandException(ErrorCode.E0610, new Object[0]);
            }
            try {
                actions = jpaService.execute(new WorkflowActionsRunningGetJPAExecutor(this.actionCheckDelay));
            }
            catch (JPAExecutorException je) {
                throw new CommandException(je);
            }
            if (actions == null || actions.size() == 0) {
                return;
            }
            this.msg.append(" WF_ACTIONS : " + actions.size());
            for (WorkflowActionBean action : actions) {
                Services.get().get(InstrumentationService.class).get().incr(ActionCheckerService.INSTRUMENTATION_GROUP, ActionCheckerService.INSTR_CHECK_ACTIONS_COUNTER, 1L);
                if (useXCommand) {
                    this.queueCallable(new ActionCheckXCommand(action.getId()));
                    continue;
                }
                this.queueCallable(new ActionCheckCommand(action.getId()));
            }
        }

        private void runCoordActionCheck() throws CommandException {
            List<CoordinatorActionBean> cactions;
            JPAService jpaService = Services.get().get(JPAService.class);
            if (jpaService == null) {
                throw new CommandException(ErrorCode.E0610, new Object[0]);
            }
            try {
                cactions = jpaService.execute(new CoordActionsRunningGetJPAExecutor(this.actionCheckDelay));
            }
            catch (JPAExecutorException je) {
                throw new CommandException(je);
            }
            if (cactions == null || cactions.size() == 0) {
                return;
            }
            this.msg.append(" COORD_ACTIONS : " + cactions.size());
            for (CoordinatorActionBean caction : cactions) {
                Services.get().get(InstrumentationService.class).get().incr(ActionCheckerService.INSTRUMENTATION_GROUP, ActionCheckerService.INSTR_CHECK_COORD_ACTIONS_COUNTER, 1L);
                if (useXCommand) {
                    this.queueCallable(new CoordActionCheckXCommand(caction.getId(), this.actionCheckDelay));
                    continue;
                }
                this.queueCallable(new CoordActionCheckCommand(caction.getId(), this.actionCheckDelay));
            }
        }

        private void queueCallable(XCallable<Void> callable) {
            if (this.callables == null) {
                this.callables = new ArrayList<XCallable<Void>>();
            }
            this.callables.add(callable);
            if (this.callables.size() == Services.get().getConf().getInt(ActionCheckerService.CONF_CALLABLE_BATCH_SIZE, 10)) {
                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                if (!ret) {
                    XLog.getLog(this.getClass()).warn("Unable to queue the callables commands for CheckerService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.callables = new ArrayList<XCallable<Void>>();
            }
        }
    }
}

