package org.apache.hadoop.hdfs.nfs.nfs3;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.InvalidParameterException;
import java.util.Iterator;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ftp.FTPFileSystem;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.nfs.nfs3.AsyncDataService;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.channel.Channel;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.class */
public class OpenFileCtx {
    public static final Log LOG;
    private final ReentrantLock ctxLock;
    private boolean activeState;
    private boolean asyncStatus;
    private final FSDataOutputStream fos;
    private final Nfs3FileAttributes latestAttr;
    private long nextOffset;
    private final SortedMap<OffsetRange, WriteCtx> pendingWrites = new TreeMap();
    private long lastAccessTime;
    private static int DUMP_WRITE_WATER_MARK;
    private FileOutputStream dumpOut;
    private long nonSequentialWriteInMemory;
    private boolean enabledDump;
    private RandomAccessFile raf;
    private final String dumpFilePath;
    public static final int COMMIT_FINISHED = 0;
    public static final int COMMIT_WAIT = 1;
    public static final int COMMIT_INACTIVE_CTX = 2;
    public static final int COMMIT_ERROR = 3;
    static final /* synthetic */ boolean $assertionsDisabled;

    private void updateLastAccessTime() {
        this.lastAccessTime = System.currentTimeMillis();
    }

    private boolean checkStreamTimeout(long j) {
        return System.currentTimeMillis() - this.lastAccessTime > j;
    }

    private long updateNonSequentialWriteInMemory(long j) {
        this.nonSequentialWriteInMemory += j;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Update nonSequentialWriteInMemory by " + j + " new value:" + this.nonSequentialWriteInMemory);
        }
        if (this.nonSequentialWriteInMemory < 0) {
            throw new InvalidParameterException("nonSequentialWriteInMemory is negative after update with count " + j);
        }
        return this.nonSequentialWriteInMemory;
    }

    SortedMap<OffsetRange, WriteCtx> getPendingWrites() {
        return this.pendingWrites;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenFileCtx(FSDataOutputStream fSDataOutputStream, Nfs3FileAttributes nfs3FileAttributes, String str) {
        this.fos = fSDataOutputStream;
        this.latestAttr = nfs3FileAttributes;
        updateLastAccessTime();
        this.activeState = true;
        this.asyncStatus = false;
        this.dumpOut = null;
        this.raf = null;
        this.nonSequentialWriteInMemory = 0L;
        this.dumpFilePath = str;
        this.enabledDump = str != null;
        this.ctxLock = new ReentrantLock();
    }

    public Nfs3FileAttributes copyLatestAttr() {
        this.ctxLock.lock();
        Nfs3FileAttributes nfs3FileAttributes = new Nfs3FileAttributes(this.latestAttr);
        this.ctxLock.unlock();
        return nfs3FileAttributes;
    }

    public long getNextOffset() {
        this.ctxLock.lock();
        long j = this.nextOffset;
        this.ctxLock.unlock();
        return j;
    }

    private long getFlushedOffset() throws IOException {
        return this.fos.getPos();
    }

    private void checkDump(long j) {
        if (!$assertionsDisabled && !this.ctxLock.isLocked()) {
            throw new AssertionError();
        }
        if (!this.enabledDump) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Do nothing, dump is disabled.");
                return;
            }
            return;
        }
        updateNonSequentialWriteInMemory(j);
        if (this.nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
            return;
        }
        if (this.dumpOut == null) {
            LOG.info("Create dump file:" + this.dumpFilePath);
            File file = new File(this.dumpFilePath);
            try {
                if (file.exists()) {
                    throw new RuntimeException("The dump file should not exist:" + this.dumpFilePath);
                }
                this.dumpOut = new FileOutputStream(file);
                if (file.createNewFile()) {
                    LOG.error("Can't create dump file:" + this.dumpFilePath);
                }
            } catch (IOException e) {
                LOG.error("Got failure when creating dump stream " + this.dumpFilePath + " with error:" + e);
                this.enabledDump = false;
                if (this.dumpOut != null) {
                    try {
                        this.dumpOut.close();
                        return;
                    } catch (IOException e2) {
                        LOG.error("Can't close dump stream " + this.dumpFilePath + " with error:" + e);
                        return;
                    }
                }
                return;
            }
        }
        if (this.raf == null) {
            try {
                this.raf = new RandomAccessFile(this.dumpFilePath, "r");
            } catch (FileNotFoundException e3) {
                LOG.error("Can't get random access to file " + this.dumpFilePath);
                this.enabledDump = false;
                return;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Start dump, current write number:" + this.pendingWrites.size());
        }
        Iterator<OffsetRange> it = this.pendingWrites.keySet().iterator();
        while (it.hasNext()) {
            WriteCtx writeCtx = this.pendingWrites.get(it.next());
            try {
                long dumpData = writeCtx.dumpData(this.dumpOut, this.raf);
                if (dumpData > 0) {
                    if (dumpData != writeCtx.getCount()) {
                        throw new RuntimeException("Dumped size, " + dumpData + ", is not write size:" + writeCtx.getCount());
                    }
                    updateNonSequentialWriteInMemory(-dumpData);
                    Nfs3Utils.writeChannel(writeCtx.getChannel(), new WRITE3Response(Nfs3Status.NFS3_OK, new WccData(this.latestAttr.getWccAttr(), this.latestAttr), (int) dumpData, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), writeCtx.getXid()));
                    writeCtx.setReplied(true);
                }
            } catch (IOException e4) {
                LOG.error("Dump data failed:" + writeCtx + " with error:" + e4);
                this.enabledDump = false;
                return;
            }
        }
        if (this.nonSequentialWriteInMemory != 0) {
            throw new RuntimeException("After dump, nonSequentialWriteInMemory is not zero: " + this.nonSequentialWriteInMemory);
        }
    }

    private boolean checkRepeatedWriteRequest(WRITE3Request wRITE3Request, Channel channel, int i) {
        WriteCtx writeCtx = this.pendingWrites.get(new OffsetRange(wRITE3Request.getOffset(), wRITE3Request.getOffset() + wRITE3Request.getCount()));
        if (writeCtx == null) {
            return false;
        }
        if (i == writeCtx.getXid()) {
            return true;
        }
        LOG.warn("Got a repeated request, same range, with a different xid:" + i + " xid in old request:" + writeCtx.getXid());
        return true;
    }

    public void receivedNewWrite(DFSClient dFSClient, WRITE3Request wRITE3Request, Channel channel, int i, AsyncDataService asyncDataService, IdUserGroup idUserGroup) {
        long offset = wRITE3Request.getOffset();
        int count = wRITE3Request.getCount();
        Nfs3Constant.WriteStableHow stableHow = wRITE3Request.getStableHow();
        this.ctxLock.lock();
        if (!this.activeState) {
            LOG.info("OpenFileCtx is inactive, fileId:" + wRITE3Request.getHandle().getFileId());
            Nfs3Utils.writeChannel(channel, new WRITE3Response(Nfs3Status.NFS3ERR_IO, new WccData(this.latestAttr.getWccAttr(), this.latestAttr), 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), i));
            return;
        }
        if (checkRepeatedWriteRequest(wRITE3Request, channel, i)) {
            LOG.debug("Repeated unstable write request: xid=" + i + " reqeust:" + wRITE3Request + " just drop it.");
            updateLastAccessTime();
            this.ctxLock.lock();
            return;
        }
        WccAttr wccAttr = this.latestAttr.getWccAttr();
        LOG.info("requesed offset=" + offset + " and current filesize=" + wccAttr.getSize());
        long nextOffset = getNextOffset();
        if (offset == nextOffset) {
            LOG.info("Add to the list, update nextOffset and notify the writer, nextOffset:" + nextOffset);
            addWrite(new WriteCtx(wRITE3Request.getHandle(), wRITE3Request.getOffset(), wRITE3Request.getCount(), wRITE3Request.getStableHow(), wRITE3Request.getData(), channel, i, true, 1));
            long j = offset + count;
            if (!this.asyncStatus) {
                this.asyncStatus = true;
                asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
            }
            updateLastAccessTime();
            this.ctxLock.unlock();
            if (wRITE3Request.getStableHow() == Nfs3Constant.WriteStableHow.UNSTABLE) {
                Nfs3Utils.writeChannel(channel, new WRITE3Response(Nfs3Status.NFS3_OK, new WccData(wccAttr, this.latestAttr), count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), i));
                return;
            }
            return;
        }
        if (offset <= nextOffset) {
            WccData wccData = new WccData(wccAttr, null);
            WRITE3Response wRITE3Response = offset + ((long) count) > nextOffset ? new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, Nfs3Constant.WriteStableHow.UNSTABLE, 0L) : processPerfectOverWrite(dFSClient, offset, count, stableHow, wRITE3Request.getData(), Nfs3Utils.getFileIdPath(wRITE3Request.getHandle()), wccData, idUserGroup);
            updateLastAccessTime();
            this.ctxLock.unlock();
            Nfs3Utils.writeChannel(channel, wRITE3Response.send(new XDR(), i));
            return;
        }
        LOG.info("Add new write to the list but not update nextOffset:" + nextOffset);
        addWrite(new WriteCtx(wRITE3Request.getHandle(), wRITE3Request.getOffset(), wRITE3Request.getCount(), wRITE3Request.getStableHow(), wRITE3Request.getData(), channel, i, true, 0));
        checkDump(wRITE3Request.getCount());
        updateLastAccessTime();
        this.ctxLock.unlock();
        if (wRITE3Request.getStableHow() == Nfs3Constant.WriteStableHow.UNSTABLE) {
            Nfs3Utils.writeChannel(channel, new WRITE3Response(Nfs3Status.NFS3_OK, new WccData(wccAttr, this.latestAttr), count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), i));
        }
    }

    private WRITE3Response processPerfectOverWrite(DFSClient dFSClient, long j, int i, Nfs3Constant.WriteStableHow writeStableHow, byte[] bArr, String str, WccData wccData, IdUserGroup idUserGroup) {
        WRITE3Response wRITE3Response;
        if (!$assertionsDisabled && !this.ctxLock.isLocked()) {
            throw new AssertionError();
        }
        byte[] bArr2 = new byte[i];
        DFSClient.DFSDataInputStream dFSDataInputStream = null;
        try {
            try {
                dFSDataInputStream = new DFSClient.DFSDataInputStream(dFSClient.open(str));
                int read = dFSDataInputStream.read((int) j, bArr2, 0, i);
                if (read < i) {
                    LOG.error("Can't read back " + i + " bytes, partial read size:" + read);
                    WRITE3Response wRITE3Response2 = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                    if (dFSDataInputStream != null) {
                        try {
                            dFSDataInputStream.close();
                        } catch (IOException e) {
                            LOG.error("Can't close inputstream for " + str + " error:" + e);
                        }
                    }
                    return wRITE3Response2;
                }
                if (dFSDataInputStream != null) {
                    try {
                        dFSDataInputStream.close();
                    } catch (IOException e2) {
                        LOG.error("Can't close inputstream for " + str + " error:" + e2);
                    }
                }
                if (new BytesWritable.Comparator().compare(bArr2, 0, read, bArr, 0, i) != 0) {
                    wRITE3Response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, writeStableHow, 0L);
                } else {
                    try {
                        dFSClient.setTimes(str, System.currentTimeMillis(), -1L);
                        wccData.setPostOpAttr(Nfs3Utils.getFileAttr(dFSClient, str, idUserGroup));
                        wRITE3Response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, i, writeStableHow, 0L);
                    } catch (IOException e3) {
                        LOG.info("Got error when processing perfect overwrite, path=" + str + " error:" + e3);
                        return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, writeStableHow, 0L);
                    }
                }
                return wRITE3Response;
            } catch (IOException e4) {
                LOG.info("Read failed when processing possible perfect overwrite, path=" + str + " error:" + e4);
                WRITE3Response wRITE3Response3 = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                if (dFSDataInputStream != null) {
                    try {
                        dFSDataInputStream.close();
                    } catch (IOException e5) {
                        LOG.error("Can't close inputstream for " + str + " error:" + e5);
                    }
                }
                return wRITE3Response3;
            }
        } catch (Throwable th) {
            if (dFSDataInputStream != null) {
                try {
                    dFSDataInputStream.close();
                } catch (IOException e6) {
                    LOG.error("Can't close inputstream for " + str + " error:" + e6);
                }
            }
            throw th;
        }
    }

    public int checkCommit(long j) {
        int i;
        this.ctxLock.lock();
        if (!this.activeState) {
            this.ctxLock.unlock();
            return 2;
        }
        if (j == 0) {
            j = getNextOffset();
        }
        try {
            long flushedOffset = getFlushedOffset();
            LOG.info("getFlushedOffset=" + flushedOffset + " commitOffset=" + j);
            if (flushedOffset < j) {
                updateLastAccessTime();
                this.ctxLock.unlock();
                return 1;
            }
            try {
                this.fos.sync();
                i = 0;
            } catch (IOException e) {
                LOG.error("Got stream error during data sync:" + e);
                i = 3;
            }
            updateLastAccessTime();
            this.ctxLock.unlock();
            return i;
        } catch (IOException e2) {
            LOG.error("Can't get flushed offset, error:" + e2);
            this.ctxLock.unlock();
            return 3;
        }
    }

    private void addWrite(WriteCtx writeCtx) {
        long offset = writeCtx.getOffset();
        int count = writeCtx.getCount();
        this.ctxLock.lock();
        getPendingWrites().put(new OffsetRange(offset, offset + count), writeCtx);
        this.ctxLock.unlock();
    }

    public boolean streamCleanup(long j, long j2) {
        if (j2 < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
            throw new InvalidParameterException("StreamTimeout" + j2 + "ms is less than MINIMIUM_STREAM_TIMEOUT " + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
        }
        if (!this.ctxLock.tryLock()) {
            return false;
        }
        boolean z = false;
        if (checkStreamTimeout(j2)) {
            LOG.info("closing stream for fileId:" + j);
            cleanup();
            z = true;
        }
        this.ctxLock.unlock();
        return z;
    }

    public void executeWriteBack() {
        this.ctxLock.lock();
        try {
            try {
                if (!this.asyncStatus) {
                    LOG.fatal("The openFileCtx has false async status");
                    System.exit(-1);
                }
                if (getPendingWrites().isEmpty()) {
                    LOG.fatal("The asyn write task has no pendding writes! fileId:" + this.latestAttr.getFileId());
                    System.exit(-2);
                }
                doWrites();
                this.asyncStatus = false;
                this.ctxLock.unlock();
            } catch (IOException e) {
                LOG.info("got exception when writing back:" + e);
                this.asyncStatus = false;
                this.ctxLock.unlock();
            }
        } catch (Throwable th) {
            this.asyncStatus = false;
            this.ctxLock.unlock();
            throw th;
        }
    }

    private void doSingleWrite(WriteCtx writeCtx) {
        if (!$assertionsDisabled && !this.ctxLock.isLocked()) {
            throw new AssertionError();
        }
        Channel channel = writeCtx.getChannel();
        int xid = writeCtx.getXid();
        long offset = writeCtx.getOffset();
        int count = writeCtx.getCount();
        Nfs3Constant.WriteStableHow stableHow = writeCtx.getStableHow();
        byte[] bArr = null;
        try {
            bArr = writeCtx.getData();
        } catch (IOException e) {
            LOG.error("Failed to get request data offset:" + offset + " count:" + count + " error:" + e);
            cleanup();
        }
        if (!$assertionsDisabled && bArr.length != count) {
            throw new AssertionError();
        }
        FileHandle handle = writeCtx.getHandle();
        LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset + " length:" + count + " stableHow:" + stableHow.getValue());
        try {
            this.fos.write(bArr, 0, count);
            if (this.fos.getPos() != offset + count) {
                throw new IOException("output stream is out of sync, pos=" + this.fos.getPos() + " and nextOffset should be" + (offset + count));
            }
            this.nextOffset = this.fos.getPos();
            if (writeCtx.getDataState() == 0) {
                updateNonSequentialWriteInMemory(-count);
            }
            if (!writeCtx.getReplied()) {
                Nfs3Utils.writeChannel(channel, new WRITE3Response(Nfs3Status.NFS3_OK, new WccData(this.latestAttr.getWccAttr(), this.latestAttr), count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), xid));
            }
        } catch (IOException e2) {
            LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + bArr.length, e2);
            if (!writeCtx.getReplied()) {
                Nfs3Utils.writeChannel(channel, new WRITE3Response(Nfs3Status.NFS3ERR_IO).send(new XDR(), xid));
            }
            LOG.info("Clean up open file context for fileId: " + this.latestAttr.getFileid());
            cleanup();
        }
    }

    private void cleanup() {
        this.activeState = false;
        try {
            if (this.fos != null) {
                this.fos.close();
            }
        } catch (IOException e) {
            LOG.info("Can't close stream for fileId:" + this.latestAttr.getFileid() + ", error:" + e);
        }
        LOG.info("There are " + this.pendingWrites.size() + " pending writes.");
        WccAttr wccAttr = this.latestAttr.getWccAttr();
        while (!this.pendingWrites.isEmpty()) {
            OffsetRange firstKey = this.pendingWrites.firstKey();
            LOG.info("Fail pending write: (" + firstKey.getMin() + StringUtils.COMMA_STR + firstKey.getMax() + "), nextOffset=" + getNextOffset());
            WriteCtx remove = this.pendingWrites.remove(firstKey);
            Nfs3Utils.writeChannel(remove.getChannel(), new WRITE3Response(Nfs3Status.NFS3ERR_IO, new WccData(wccAttr, this.latestAttr), 0, remove.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF).send(new XDR(), remove.getXid()));
        }
        if (this.dumpOut != null) {
            try {
                this.dumpOut.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
        if (this.raf != null) {
            try {
                this.raf.close();
            } catch (IOException e3) {
                e3.printStackTrace();
            }
        }
        new File(this.dumpFilePath).delete();
    }

    private void doWrites() throws IOException {
        if (!$assertionsDisabled && !this.ctxLock.isLocked()) {
            throw new AssertionError();
        }
        SortedMap<OffsetRange, WriteCtx> pendingWrites = getPendingWrites();
        while (!pendingWrites.isEmpty() && this.activeState) {
            long nextOffset = getNextOffset();
            OffsetRange firstKey = pendingWrites.firstKey();
            if (LOG.isTraceEnabled()) {
                LOG.trace("key.getMin()=" + firstKey.getMin() + " nextOffset=" + nextOffset);
            }
            if (firstKey.getMin() > nextOffset) {
                LOG.info("The next sequencial write has not arrived yet");
                return;
            } else {
                if (firstKey.getMin() < nextOffset && firstKey.getMax() > nextOffset) {
                    throw new IOException("Got a overlapping write (" + firstKey.getMin() + StringUtils.COMMA_STR + firstKey.getMax() + "), nextOffset=" + nextOffset);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Remove write(" + firstKey.getMin() + "-" + firstKey.getMax() + ") from the list");
                }
                doSingleWrite(pendingWrites.remove(firstKey));
                updateLastAccessTime();
            }
        }
    }

    static {
        $assertionsDisabled = !OpenFileCtx.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(OpenFileCtx.class);
        DUMP_WRITE_WATER_MARK = FTPFileSystem.DEFAULT_BUFFER_SIZE;
    }
}
