/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

public class SplitLogWorker
extends ZooKeeperListener
implements Runnable {
    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
    Thread worker;
    private final String serverName;
    private final TaskExecutor splitTaskExecutor;
    private long zkretries;
    private Object taskReadyLock = new Object();
    volatile int taskReadySeq = 0;
    private volatile String currentTask = null;
    private int currentVersion;
    private volatile boolean exitWorker;
    private Object grabTaskLock = new Object();
    private boolean workerInGrabTask = false;

    public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, String serverName, TaskExecutor splitTaskExecutor) {
        super(watcher);
        this.serverName = serverName;
        this.splitTaskExecutor = splitTaskExecutor;
        this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3L);
    }

    public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, final String serverName) {
        this(watcher, conf, serverName, new TaskExecutor(){

            @Override
            public TaskExecutor.Status exec(String filename, CancelableProgressable p) {
                FileSystem fs;
                Path rootdir;
                try {
                    rootdir = FSUtils.getRootDir(conf);
                    fs = rootdir.getFileSystem(conf);
                }
                catch (IOException e) {
                    LOG.warn((Object)"could not find root dir or fs", (Throwable)e);
                    return TaskExecutor.Status.RESIGNED;
                }
                try {
                    String tmpname = ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
                    if (!HLogSplitter.splitLogFileToTemp(rootdir, tmpname, fs.getFileStatus(new Path(filename)), fs, conf, p)) {
                        return TaskExecutor.Status.PREEMPTED;
                    }
                }
                catch (InterruptedIOException iioe) {
                    LOG.warn((Object)("log splitting of " + filename + " interrupted, resigning"), (Throwable)iioe);
                    return TaskExecutor.Status.RESIGNED;
                }
                catch (IOException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof InterruptedException) {
                        LOG.warn((Object)("log splitting of " + filename + " interrupted, resigning"), (Throwable)e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    LOG.warn((Object)("log splitting of " + filename + " failed, returning error"), (Throwable)e);
                    return TaskExecutor.Status.ERR;
                }
                return TaskExecutor.Status.DONE;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            LOG.info((Object)("SplitLogWorker " + this.serverName + " starting"));
            this.watcher.registerListener(this);
            int res = -1;
            while (res == -1) {
                try {
                    res = ZKUtil.checkExists(this.watcher, this.watcher.splitLogZNode);
                }
                catch (KeeperException e) {
                    LOG.warn((Object)("Exception when checking for " + this.watcher.splitLogZNode + " ... retrying"), (Throwable)e);
                }
                if (res != -1) continue;
                try {
                    LOG.info((Object)(this.watcher.splitLogZNode + " znode does not exist," + " waiting for master to create one"));
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.debug((Object)("Interrupted while waiting for " + this.watcher.splitLogZNode));
                    assert (this.exitWorker);
                }
            }
            this.taskLoop();
        }
        catch (Throwable t) {
            LOG.error((Object)"unexpected error ", t);
        }
        finally {
            LOG.info((Object)("SplitLogWorker " + this.serverName + " exiting"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void taskLoop() {
        while (true) {
            int seq_start = this.taskReadySeq;
            List<String> paths = this.getTaskList();
            if (paths == null) {
                LOG.warn((Object)("Could not get tasks, did someone remove " + this.watcher.splitLogZNode + " ... worker thread exiting."));
                return;
            }
            int offset = (int)(Math.random() * (double)paths.size());
            for (int i = 0; i < paths.size(); ++i) {
                int idx = (i + offset) % paths.size();
                this.grabTask(ZKUtil.joinZNode(this.watcher.splitLogZNode, paths.get(idx)));
                if (!this.exitWorker) continue;
                return;
            }
            Object object = this.taskReadyLock;
            synchronized (object) {
                while (seq_start == this.taskReadySeq) {
                    try {
                        this.taskReadyLock.wait();
                    }
                    catch (InterruptedException e) {
                        LOG.warn((Object)"SplitLogWorker inteurrpted while waiting for task, exiting", (Throwable)e);
                        assert (this.exitWorker);
                        return;
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void grabTask(String path) {
        Object object;
        long t;
        Stat stat;
        block38: {
            stat = new Stat();
            t = -1L;
            object = this.grabTaskLock;
            synchronized (object) {
                this.currentTask = path;
                this.workerInGrabTask = true;
                if (Thread.interrupted()) {
                    return;
                }
            }
            try {
                byte[] data = ZKUtil.getDataNoWatch(this.watcher, path, stat);
                if (data == null) {
                    ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
                    return;
                }
                if (ZKSplitLog.TaskState.TASK_UNASSIGNED.equals(data)) break block38;
            }
            catch (KeeperException e) {
                LOG.warn((Object)("Failed to get data for znode " + path), (Throwable)e);
                ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
                return;
            }
            ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
            return;
        }
        this.currentVersion = stat.getVersion();
        if (!this.attemptToOwnTask(true)) {
            ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
            return;
        }
        if (ZKSplitLog.isRescanNode(this.watcher, this.currentTask)) {
            this.endTask(ZKSplitLog.TaskState.TASK_DONE, ZKSplitLog.Counters.tot_wkr_task_acquired_rescan);
            return;
        }
        LOG.info((Object)("worker " + this.serverName + " acquired task " + path));
        ZKSplitLog.Counters.tot_wkr_task_acquired.incrementAndGet();
        this.getDataSetWatchAsync();
        t = System.currentTimeMillis();
        TaskExecutor.Status status = this.splitTaskExecutor.exec(ZKSplitLog.getFileName(this.currentTask), new CancelableProgressable(){

            @Override
            public boolean progress() {
                if (!SplitLogWorker.this.attemptToOwnTask(false)) {
                    LOG.warn((Object)("Failed to heartbeat the task" + SplitLogWorker.this.currentTask));
                    return false;
                }
                return true;
            }
        });
        switch (status) {
            case DONE: {
                this.endTask(ZKSplitLog.TaskState.TASK_DONE, ZKSplitLog.Counters.tot_wkr_task_done);
                return;
            }
            case PREEMPTED: {
                ZKSplitLog.Counters.tot_wkr_preempt_task.incrementAndGet();
                LOG.warn((Object)("task execution prempted " + path));
                return;
            }
            case ERR: {
                if (!this.exitWorker) {
                    this.endTask(ZKSplitLog.TaskState.TASK_ERR, ZKSplitLog.Counters.tot_wkr_task_err);
                    return;
                }
            }
            case RESIGNED: {
                if (this.exitWorker) {
                    LOG.info((Object)("task execution interrupted because worker is exiting " + path));
                    this.endTask(ZKSplitLog.TaskState.TASK_RESIGNED, ZKSplitLog.Counters.tot_wkr_task_resigned);
                    return;
                }
                ZKSplitLog.Counters.tot_wkr_preempt_task.incrementAndGet();
                LOG.info((Object)("task execution interrupted via zk by manager " + path));
                return;
            }
        }
        return;
        finally {
            if (t > 0L) {
                LOG.info((Object)("worker " + this.serverName + " done with task " + path + " in " + (System.currentTimeMillis() - t) + "ms"));
            }
            object = this.grabTaskLock;
            synchronized (object) {
                this.workerInGrabTask = false;
                Thread.interrupted();
            }
        }
    }

    private boolean attemptToOwnTask(boolean isFirstTime) {
        try {
            Stat stat = this.watcher.getZooKeeper().setData(this.currentTask, ZKSplitLog.TaskState.TASK_OWNED.get(this.serverName), this.currentVersion);
            if (stat == null) {
                LOG.warn((Object)("zk.setData() returned null for path " + this.currentTask));
                ZKSplitLog.Counters.tot_wkr_task_heartbeat_failed.incrementAndGet();
                return false;
            }
            this.currentVersion = stat.getVersion();
            ZKSplitLog.Counters.tot_wkr_task_heartbeat.incrementAndGet();
            return true;
        }
        catch (KeeperException e) {
            if (!isFirstTime) {
                if (e.code().equals((Object)KeeperException.Code.NONODE)) {
                    LOG.warn((Object)("NONODE failed to assert ownership for " + this.currentTask), (Throwable)e);
                } else if (e.code().equals((Object)KeeperException.Code.BADVERSION)) {
                    LOG.warn((Object)("BADVERSION failed to assert ownership for " + this.currentTask), (Throwable)e);
                } else {
                    LOG.warn((Object)("failed to assert ownership for " + this.currentTask), (Throwable)e);
                }
            }
        }
        catch (InterruptedException e1) {
            LOG.warn((Object)("Interrupted while trying to assert ownership of " + this.currentTask + " " + StringUtils.stringifyException((Throwable)e1)));
            Thread.currentThread().interrupt();
        }
        ZKSplitLog.Counters.tot_wkr_task_heartbeat_failed.incrementAndGet();
        return false;
    }

    private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) {
        String path = this.currentTask;
        this.currentTask = null;
        try {
            if (ZKUtil.setData(this.watcher, path, ts.get(this.serverName), this.currentVersion)) {
                LOG.info((Object)("successfully transitioned task " + path + " to final state " + (Object)((Object)ts)));
                ctr.incrementAndGet();
                return;
            }
            LOG.warn((Object)("failed to transistion task " + path + " to end state " + (Object)((Object)ts) + " because of version mismatch "));
        }
        catch (KeeperException.BadVersionException bve) {
            LOG.warn((Object)("transisition task " + path + " to " + (Object)((Object)ts) + " failed because of version mismatch"), (Throwable)bve);
        }
        catch (KeeperException.NoNodeException e) {
            LOG.fatal((Object)("logic error - end task " + path + " " + (Object)((Object)ts) + " failed because task doesn't exist"), (Throwable)e);
        }
        catch (KeeperException e) {
            LOG.warn((Object)("failed to end task, " + path + " " + (Object)((Object)ts)), (Throwable)e);
        }
        ZKSplitLog.Counters.tot_wkr_final_transistion_failed.incrementAndGet();
    }

    void getDataSetWatchAsync() {
        this.watcher.getZooKeeper().getData(this.currentTask, (Watcher)this.watcher, (AsyncCallback.DataCallback)new GetDataAsyncCallback(), null);
        ZKSplitLog.Counters.tot_wkr_get_data_queued.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void getDataSetWatchSuccess(String path, byte[] data) {
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path) && !ZKSplitLog.TaskState.TASK_OWNED.equals(data, this.serverName) && !ZKSplitLog.TaskState.TASK_DONE.equals(data, this.serverName) && !ZKSplitLog.TaskState.TASK_ERR.equals(data, this.serverName) && !ZKSplitLog.TaskState.TASK_RESIGNED.equals(data, this.serverName)) {
                LOG.info((Object)("task " + taskpath + " preempted from server " + this.serverName + " ... current task state and owner - " + new String(data)));
                this.stopTask();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void getDataSetWatchFailure(String path) {
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path)) {
                LOG.info((Object)("retrying data watch on " + path));
                ZKSplitLog.Counters.tot_wkr_get_data_retry.incrementAndGet();
                this.getDataSetWatchAsync();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeDataChanged(String path) {
        Object object = this.grabTaskLock;
        synchronized (object) {
            String taskpath;
            if (this.workerInGrabTask && (taskpath = this.currentTask) != null && taskpath.equals(path)) {
                this.getDataSetWatchAsync();
            }
        }
    }

    private List<String> getTaskList() {
        int i = 0;
        while ((long)i < this.zkretries) {
            try {
                return ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.splitLogZNode);
            }
            catch (KeeperException e) {
                LOG.warn((Object)("Could not get children of znode " + this.watcher.splitLogZNode), (Throwable)e);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e1) {
                    LOG.warn((Object)"Interrupted while trying to get task list ...", (Throwable)e1);
                    Thread.currentThread().interrupt();
                    return null;
                }
                ++i;
            }
        }
        LOG.warn((Object)("Tried " + this.zkretries + " times, still couldn't fetch " + "children of " + this.watcher.splitLogZNode + " giving up"));
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeChildrenChanged(String path) {
        if (path.equals(this.watcher.splitLogZNode)) {
            LOG.debug((Object)"tasks arrived or departed");
            Object object = this.taskReadyLock;
            synchronized (object) {
                ++this.taskReadySeq;
                this.taskReadyLock.notify();
            }
        }
    }

    void stopTask() {
        LOG.info((Object)"Sending interrupt to stop the worker thread");
        this.worker.interrupt();
    }

    public void start() {
        this.worker = new Thread(null, this, "SplitLogWorker-" + this.serverName);
        this.exitWorker = false;
        this.worker.start();
    }

    public void stop() {
        this.exitWorker = true;
        this.stopTask();
    }

    public static interface TaskExecutor {
        public Status exec(String var1, CancelableProgressable var2);

        public static enum Status {
            DONE,
            ERR,
            RESIGNED,
            PREEMPTED;

        }
    }

    class GetDataAsyncCallback
    implements AsyncCallback.DataCallback {
        private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);

        GetDataAsyncCallback() {
        }

        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            ZKSplitLog.Counters.tot_wkr_get_data_result.incrementAndGet();
            if (rc != 0) {
                this.LOG.warn((Object)("getdata rc = " + KeeperException.Code.get((int)rc) + " " + path));
                SplitLogWorker.this.getDataSetWatchFailure(path);
                return;
            }
            SplitLogWorker.this.getDataSetWatchSuccess(path, data);
        }
    }
}

