backends/flint/flint_table.cc

Go to the documentation of this file.
00001 /* flint_table.cc: Btree implementation
00002  *
00003  * Copyright 1999,2000,2001 BrightStation PLC
00004  * Copyright 2002 Ananova Ltd
00005  * Copyright 2002,2003,2004,2005,2006,2007 Olly Betts
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License as
00009  * published by the Free Software Foundation; either version 2 of the
00010  * License, or (at your option) any later version.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
00020  * USA
00021  */
00022 
00023 #include <config.h>
00024 
00025 #include <xapian/error.h>
00026 
00027 #include "safeerrno.h"
00028 #ifdef __WIN32__
00029 # include "msvc_posix_wrapper.h"
00030 #endif
00031 
00032 #include "omassert.h"
00033 #include "stringutils.h" // For STRINGIZE().
00034 
00035 // Define to use "dangerous" mode - in this mode we write modified btree
00036 // blocks back in place.  This is somewhat faster (especially once we're
00037 // I/O bound) but the database can't be safely searched during indexing
00038 // and if indexing is terminated uncleanly, the database may be corrupt.
00039 //
00040 // Despite the risks, it's useful for speeding up a full rebuild.
00041 //
00042 // FIXME: make this mode run-time selectable, and record that it is currently
00043 // in use somewhere on disk, so readers can check and refuse to open the
00044 // database.
00045 //
00046 // #define DANGEROUS
00047 
00048 #include <sys/types.h>
00049 
00050 // Trying to include the correct headers with the correct defines set to
00051 // get pread() and pwrite() prototyped on every platform without breaking any
00052 // other platform is a real can of worms.  So instead we probe for what
00053 // prototypes (if any) are required in configure and put them into
00054 // PREAD_PROTOTYPE and PWRITE_PROTOTYPE.
00055 #if defined HAVE_PREAD && defined PREAD_PROTOTYPE
00056 PREAD_PROTOTYPE
00057 #endif
00058 #if defined HAVE_PWRITE && defined PWRITE_PROTOTYPE
00059 PWRITE_PROTOTYPE
00060 #endif
00061 
00062 #include <stdio.h>
00063 #include <string.h>   /* for memmove */
00064 #include <limits.h>   /* for CHAR_BIT */
00065 
00066 #include "flint_io.h"
00067 #include "flint_table.h"
00068 #include "flint_btreeutil.h"
00069 #include "flint_btreebase.h"
00070 #include "flint_cursor.h"
00071 #include "flint_utils.h"
00072 
00073 #include "omassert.h"
00074 #include "omdebug.h"
00075 #include <xapian/error.h>
00076 #include "utils.h"
00077 
00078 #include <algorithm>  // for std::min()
00079 #include <string>
00080 #include <vector>
00081 
00082 using namespace std;
00083 
00084 // Only try to compress tags longer than this many bytes.
00085 const size_t COMPRESS_MIN = 4;
00086 
00087 //#define BTREE_DEBUG_FULL 1
00088 #undef BTREE_DEBUG_FULL
00089 
00090 #ifdef BTREE_DEBUG_FULL
00091 /*------debugging aids from here--------*/
00092 
00093 static void print_key(const byte * p, int c, int j);
00094 static void print_tag(const byte * p, int c, int j);
00095 
00096 /*
00097 static void report_cursor(int N, Btree * B, Cursor_ * C)
00098 {
00099     int i;
00100     printf("%d)\n", N);
00101     for (i = 0; i <= B->level; i++)
00102         printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
00103                 C[i].p, C[i].c, C[i].n, C[i].rewrite);
00104 }
00105 */
00106 
00107 /*------to here--------*/
00108 #endif /* BTREE_DEBUG_FULL */
00109 
00110 /* Input/output is defined with calls to the basic Unix system interface: */
00111 
00112 static void sys_unlink(const string &filename)
00113 {
00114 #ifdef __WIN32__
00115     if (msvc_posix_unlink(filename.c_str()) == -1) {
00116 #else
00117     if (unlink(filename) == -1) {
00118 #endif
00119         string message = "Failed to unlink ";
00120         message += filename;
00121         message += ": ";
00122         message += strerror(errno);
00123         throw Xapian::DatabaseCorruptError(message);
00124     }
00125 }
00126 
00127 static inline byte *zeroed_new(size_t size)
00128 {
00129     byte *temp = new byte[size];
00130     // No need to check if temp is NULL, new throws std::bad_alloc if
00131     // allocation fails.
00132     Assert(temp);
00133     memset(temp, 0, size);
00134     return temp;
00135 }
00136 
00137 /* A B-tree comprises (a) a base file, containing essential information (Block
00138    size, number of the B-tree root block etc), (b) a bitmap, the Nth bit of the
00139    bitmap being set if the Nth block of the B-tree file is in use, and (c) a
00140    file DB containing the B-tree proper. The DB file is divided into a sequence
00141    of equal sized blocks, numbered 0, 1, 2 ... some of which are free, some in
00142    use. Those in use are arranged in a tree.
00143 
00144    Each block, b, has a structure like this:
00145 
00146      R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
00147      <---------- D ----------> <-M->
00148 
00149    And then,
00150 
00151    R = REVISION(b)  is the revision number the B-tree had when the block was
00152                     written into the DB file.
00153    L = GET_LEVEL(b) is the level of the block, which is the number of levels
00154                     towards the root of the B-tree structure. So leaf blocks
00155                     have level 0 and the one root block has the highest level
00156                     equal to the number of levels in the B-tree.
00157    M = MAX_FREE(b)  is the size of the gap between the end of the directory and
00158                     the first item of data. (It is not necessarily the maximum
00159                     size among the bits of space that are free, but I can't
00160                     think of a better name.)
00161    T = TOTAL_FREE(b)is the total amount of free space left in b.
00162    D = DIR_END(b)   gives the offset to the end of the directory.
00163 
00164    o1, o2 ... oN are a directory of offsets to the N items held in the block.
00165    The items are key-tag pairs, and as they occur in the directory are ordered
00166    by the keys.
00167 
00168    An item has this form:
00169 
00170            I K key x C tag
00171              <--K-->
00172            <------I------>
00173 
00174    A long tag presented through the API is split up into C tags small enough to
00175    be accommodated in the blocks of the B-tree. The key is extended to include
00176    a counter, x, which runs from 1 to C. The key is preceded by a length, K,
00177    and the whole item with a length, I, as depicted above.
00178 
00179    Here are the corresponding definitions:
00180 
00181 */
00182 
00183 #define REVISION(b)      static_cast<unsigned int>(getint4(b, 0))
00184 #define GET_LEVEL(b)     getint1(b, 4)
00185 #define MAX_FREE(b)      getint2(b, 5)
00186 #define TOTAL_FREE(b)    getint2(b, 7)
00187 #define DIR_END(b)       getint2(b, 9)
00188 #define DIR_START        11
00189 
00190 #define SET_REVISION(b, x)      setint4(b, 0, x)
00191 #define SET_LEVEL(b, x)         setint1(b, 4, x)
00192 #define SET_MAX_FREE(b, x)      setint2(b, 5, x)
00193 #define SET_TOTAL_FREE(b, x)    setint2(b, 7, x)
00194 #define SET_DIR_END(b, x)       setint2(b, 9, x)
00195 
00198 #define SEQ_START_POINT (-10)
00199 
00202 #define BLOCK_CAPACITY 4
00203 
00204 
00205 
00206 /* There are two bit maps in bit_map0 and bit_map. The nth bit of bitmap is 0
00207    if the nth block is free, otherwise 1. bit_map0 is the initial state of
00208    bitmap at the start of the current transaction.
00209 
00210    Note use of the limits.h values:
00211    UCHAR_MAX = 255, an unsigned with all bits set, and
00212    CHAR_BIT = 8, the number of bits per byte
00213 
00214    BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
00215    bytes -- 64K effectively.
00216 */
00217 
00218 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
00219 
00221 void
00222 FlintTable::read_block(uint4 n, byte * p) const
00223 {
00224     // Log the value of p, not the contents of the block it points to...
00225     DEBUGCALL(DB, void, "FlintTable::read_block", n << ", " << (void*)p);
00226     /* Use the base bit_map_size not the bitmap's size, because
00227      * the latter is uninitialised in readonly mode.
00228      */
00229     Assert(n / CHAR_BIT < base.get_bit_map_size());
00230 
00231 #ifdef HAVE_PREAD
00232     off_t offset = off_t(block_size) * n;
00233     int m = block_size;
00234     while (true) {
00235         ssize_t bytes_read = pread(handle, reinterpret_cast<char *>(p), m,
00236                                    offset);
00237         // normal case - read succeeded, so return.
00238         if (bytes_read == m) return;
00239         if (bytes_read == -1) {
00240             if (errno == EINTR) continue;
00241             string message = "Error reading block " + om_tostring(n) + ": ";
00242             message += strerror(errno);
00243             throw Xapian::DatabaseError(message);
00244         } else if (bytes_read == 0) {
00245             string message = "Error reading block " + om_tostring(n) + ": got end of file";
00246             throw Xapian::DatabaseError(message);
00247         } else if (bytes_read < m) {
00248             /* Read part of the block, which is not an error.  We should
00249              * continue reading the rest of the block.
00250              */
00251             m -= bytes_read;
00252             p += bytes_read;
00253             offset += bytes_read;
00254         }
00255     }
00256 #else
00257     if (lseek(handle, off_t(block_size) * n, SEEK_SET) == -1) {
00258         string message = "Error seeking to block: ";
00259         message += strerror(errno);
00260         throw Xapian::DatabaseError(message);
00261     }
00262 
00263     flint_io_read(handle, reinterpret_cast<char *>(p), block_size, block_size);
00264 #endif
00265 }
00266 
00273 void
00274 FlintTable::write_block(uint4 n, const byte * p) const
00275 {
00276     DEBUGCALL(DB, void, "FlintTable::write_block", n << ", " << p);
00277     Assert(writable);
00278     /* Check that n is in range. */
00279     Assert(n / CHAR_BIT < base.get_bit_map_size());
00280 
00281     /* don't write to non-free */;
00282     AssertParanoid(base.block_free_at_start(n));
00283 
00284     /* write revision is okay */
00285     AssertEqParanoid(REVISION(p), latest_revision_number + 1);
00286 
00287     if (both_bases) {
00288         // Delete the old base before modifying the database.
00289         sys_unlink(name + "base" + other_base_letter());
00290         both_bases = false;
00291         latest_revision_number = revision_number;
00292     }
00293 
00294 #ifdef HAVE_PWRITE
00295     off_t offset = off_t(block_size) * n;
00296     int m = block_size;
00297     while (true) {
00298         ssize_t bytes_written = pwrite(handle, p, m, offset);
00299         if (bytes_written == m) {
00300             // normal case - write succeeded, so return.
00301             return;
00302         } else if (bytes_written == -1) {
00303             if (errno == EINTR) continue;
00304             string message = "Error writing block: ";
00305             message += strerror(errno);
00306             throw Xapian::DatabaseError(message);
00307         } else if (bytes_written == 0) {
00308             string message = "Error writing block: wrote no data";
00309             throw Xapian::DatabaseError(message);
00310         } else if (bytes_written < m) {
00311             /* Wrote part of the block, which is not an error.  We should
00312              * continue writing the rest of the block.
00313              */
00314             m -= bytes_written;
00315             p += bytes_written;
00316             offset += bytes_written;
00317         }
00318     }
00319 #else
00320     if (lseek(handle, (off_t)block_size * n, SEEK_SET) == -1) {
00321         string message = "Error seeking to block: ";
00322         message += strerror(errno);
00323         throw Xapian::DatabaseError(message);
00324     }
00325 
00326     flint_io_write(handle, reinterpret_cast<const char *>(p), block_size);
00327 #endif
00328 }
00329 
00330 
00331 /* A note on cursors:
00332 
00333    Each B-tree level has a corresponding array element C[j] in a
00334    cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
00335    root block level. Within a level j,
00336 
00337        C[j].p  addresses the block
00338        C[j].c  is the offset into the directory entry in the block
00339        C[j].n  is the number of the block at C[j].p
00340 
00341    A look up in the B-tree causes navigation of the blocks starting
00342    from the root. In each block, p, we find an offset, c, to an item
00343    which gives the number, n, of the block for the next level. This
00344    leads to an array of values p,c,n which are held inside the cursor.
00345 
00346    Any Btree object B has a built-in cursor, at B->C. But other cursors may
00347    be created.  If BC is a created cursor, BC->C is the cursor in the
00348    sense given above, and BC->B is the handle for the B-tree again.
00349 */
00350 
00351 
00352 void
00353 FlintTable::set_overwritten() const
00354 {
00355     DEBUGCALL(DB, void, "FlintTable::set_overwritten", "");
00356     // If we're writable, there shouldn't be another writer who could cause
00357     // overwritten to be flagged, so that's a DatabaseCorruptError.
00358     if (writable)
00359         throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
00360     throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
00361 }
00362 
00363 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
00364    C, writing the block currently at C[j] back to disk if necessary.
00365    Note that
00366 
00367        C[j].rewrite
00368 
00369    is true iff C[j].n is different from block n in file DB. If it is
00370    false no rewriting is necessary.
00371 */
00372 
00373 void
00374 FlintTable::block_to_cursor(Cursor_ * C_, int j, uint4 n) const
00375 {
00376     DEBUGCALL(DB, void, "FlintTable::block_to_cursor", (void*)C_ << ", " << j << ", " << n);
00377     if (n == C_[j].n) return;
00378     byte * p = C_[j].p;
00379     Assert(p);
00380 
00381     // FIXME: only needs to be done in write mode
00382     if (C_[j].rewrite) {
00383         Assert(writable);
00384         Assert(C == C_);
00385         write_block(C_[j].n, p);
00386         C_[j].rewrite = false;
00387     }
00388     // Check if the block is in the built-in cursor (potentially in
00389     // modified form).
00390     if (writable && n == C[j].n) {
00391         memcpy(p, C[j].p, block_size);
00392     } else {
00393         read_block(n, p);
00394     }
00395 
00396     C_[j].n = n;
00397     if (j < level) {
00398         /* unsigned comparison */
00399         if (REVISION(p) > REVISION(C_[j + 1].p)) {
00400             set_overwritten();
00401             return;
00402         }
00403     }
00404     AssertEq(j, GET_LEVEL(p));
00405 }
00406 
00428 void
00429 FlintTable::alter()
00430 {
00431     DEBUGCALL(DB, void, "FlintTable::alter", "");
00432     Assert(writable);
00433 #ifdef DANGEROUS
00434     C[0].rewrite = true;
00435 #else
00436     int j = 0;
00437     byte * p = C[j].p;
00438     while (true) {
00439         if (C[j].rewrite) return; /* all new, so return */
00440         C[j].rewrite = true;
00441 
00442         uint4 n = C[j].n;
00443         if (base.block_free_at_start(n)) {
00444             Assert(REVISION(p) == latest_revision_number + 1);
00445             return;
00446         }
00447         Assert(REVISION(p) < latest_revision_number + 1);
00448         base.free_block(n);
00449         n = base.next_free_block();
00450         C[j].n = n;
00451         SET_REVISION(p, latest_revision_number + 1);
00452 
00453         if (j == level) return;
00454         j++;
00455         p = C[j].p;
00456         Item_wr_(p, C[j].c).set_block_given_by(n);
00457     }
00458 #endif
00459 }
00460 
00473 int FlintTable::find_in_block(const byte * p, Key_ key, bool leaf, int c)
00474 {
00475     DEBUGCALL_STATIC(DB, int, "FlintTable::find_in_block", reinterpret_cast<const void*>(p) << ", " << reinterpret_cast<const void*>(key.get_address()) << ", " << leaf << ", " << c);
00476     int i = DIR_START;
00477     if (leaf) i -= D2;
00478     int j = DIR_END(p);
00479 
00480     if (c != -1) {
00481         if (c < j && i < c && Item_(p, c).key() <= key)
00482             i = c;
00483         c += D2;
00484         if (c < j && i < c && key < Item_(p, c).key())
00485             j = c;
00486     }
00487 
00488     while (j - i > D2) {
00489         int k = i + ((j - i)/(D2 * 2))*D2; /* mid way */
00490         if (key < Item_(p, k).key()) j = k; else i = k;
00491     }
00492     RETURN(i);
00493 }
00494 
00502 bool
00503 FlintTable::find(Cursor_ * C_) const
00504 {
00505     DEBUGCALL(DB, bool, "FlintTable::find", (void*)C_);
00506     // Note: the parameter is needed when we're called by FlintCursor
00507     const byte * p;
00508     int c;
00509     Key_ key = kt.key();
00510     for (int j = level; j > 0; --j) {
00511         p = C_[j].p;
00512         c = find_in_block(p, key, false, C_[j].c);
00513 #ifdef BTREE_DEBUG_FULL
00514         printf("Block in FlintTable:find - code position 1");
00515         report_block_full(j, C_[j].n, p);
00516 #endif /* BTREE_DEBUG_FULL */
00517         C_[j].c = c;
00518         block_to_cursor(C_, j - 1, Item_(p, c).block_given_by());
00519     }
00520     p = C_[0].p;
00521     c = find_in_block(p, key, true, C_[0].c);
00522 #ifdef BTREE_DEBUG_FULL
00523     printf("Block in FlintTable:find - code position 2");
00524     report_block_full(0, C_[0].n, p);
00525 #endif /* BTREE_DEBUG_FULL */
00526     C_[0].c = c;
00527     if (c < DIR_START) RETURN(false);
00528     RETURN(Item_(p, c).key() == key);
00529 }
00530 
00536 void
00537 FlintTable::compact(byte * p)
00538 {
00539     DEBUGCALL(DB, void, "FlintTable::compact", (void*)p);
00540     Assert(writable);
00541     int e = block_size;
00542     byte * b = buffer;
00543     int dir_end = DIR_END(p);
00544     for (int c = DIR_START; c < dir_end; c += D2) {
00545         Item_ item(p, c);
00546         int l = item.size();
00547         e -= l;
00548         memmove(b + e, item.get_address(), l);
00549         setD(p, c, e);  /* reform in b */
00550     }
00551     memmove(p + e, b + e, block_size - e);  /* copy back */
00552     e -= dir_end;
00553     SET_TOTAL_FREE(p, e);
00554     SET_MAX_FREE(p, e);
00555 }
00556 
00560 void
00561 FlintTable::split_root(uint4 split_n)
00562 {
00563     DEBUGCALL(DB, void, "FlintTable::split_root", split_n);
00564     /* gain a level */
00565     ++level;
00566 
00567     /* check level overflow - this isn't something that should ever happen
00568      * but deserves more than an Assert()... */
00569     if (level == BTREE_CURSOR_LEVELS) {
00570         throw Xapian::DatabaseCorruptError("Btree has grown impossibly large ("STRINGIZE(BTREE_CURSOR_LEVELS)" levels)");
00571     }
00572 
00573     byte * q = zeroed_new(block_size);
00574     C[level].p = q;
00575     C[level].c = DIR_START;
00576     C[level].n = base.next_free_block();
00577     C[level].rewrite = true;
00578     SET_REVISION(q, latest_revision_number + 1);
00579     SET_LEVEL(q, level);
00580     SET_DIR_END(q, DIR_START);
00581     compact(q);   /* to reset TOTAL_FREE, MAX_FREE */
00582 
00583     /* form a null key in b with a pointer to the old root */
00584     byte b[10]; /* 7 is exact */
00585     Item_wr_ item(b);
00586     item.form_null_key(split_n);
00587     add_item(item, level);
00588 }
00589 
00605 void
00606 FlintTable::enter_key(int j, Key_ prevkey, Key_ newkey)
00607 {
00608     Assert(writable);
00609     Assert(prevkey < newkey);
00610     Assert(j >= 1);
00611 
00612     uint4 blocknumber = C[j - 1].n;
00613 
00614     // FIXME update to use Key_
00615     // Keys are truncated here: but don't truncate the count at the end away.
00616     const int newkey_len = newkey.length();
00617     int i;
00618 
00619     if (j == 1) {
00620         // Truncate the key to the minimal key which differs from prevkey,
00621         // the preceding key in the block.
00622         i = 0;
00623         const int min_len = min(newkey_len, prevkey.length());
00624         while (i < min_len && prevkey[i] == newkey[i]) {
00625             i++;
00626         }
00627 
00628         // Want one byte of difference.
00629         if (i < newkey_len) i++;
00630     } else {
00631         /* Can't truncate between branch levels, since the separated keys
00632          * are in at the leaf level, and truncating again will change the
00633          * branch point.
00634          */
00635         i = newkey_len;
00636     }
00637 
00638     byte b[UCHAR_MAX + 6];
00639     Item_wr_ item(b);
00640     Assert(i <= 256 - I2 - C2);
00641     Assert(i <= (int)sizeof(b) - I2 - C2 - 4);
00642     item.set_key_and_block(newkey, i, blocknumber);
00643 
00644     // When j > 1 we can make the first key of block p null.  This is probably
00645     // worthwhile as it trades a small amount of CPU and RAM use for a small
00646     // saving in disk use.  Other redundant keys will still creep in though.
00647     if (j > 1) {
00648         byte * p = C[j - 1].p;
00649         uint4 n = getint4(newkey.get_address(), newkey_len + K1 + C2);
00650         int new_total_free = TOTAL_FREE(p) + newkey_len + C2;
00651         // FIXME: incredibly icky going from key to item like this...
00652         Item_wr_(const_cast<byte*>(newkey.get_address()) - I2).form_null_key(n);
00653         SET_TOTAL_FREE(p, new_total_free);
00654     }
00655 
00656     C[j].c = find_in_block(C[j].p, item.key(), false, 0) + D2;
00657     C[j].rewrite = true; /* a subtle point: this *is* required. */
00658     add_item(item, j);
00659 }
00660 
00665 int
00666 FlintTable::mid_point(byte * p)
00667 {
00668     int n = 0;
00669     int dir_end = DIR_END(p);
00670     int size = block_size - TOTAL_FREE(p) - dir_end;
00671     for (int c = DIR_START; c < dir_end; c += D2) {
00672         int l = Item_(p, c).size();
00673         n += 2 * l;
00674         if (n >= size) {
00675             if (l < n - size) return c;
00676             return c + D2;
00677         }
00678     }
00679 
00680     /* falling out of mid_point */
00681     Assert(false);
00682     return 0; /* Stop compiler complaining about end of method. */
00683 }
00684 
00693 void
00694 FlintTable::add_item_to_block(byte * p, Item_wr_ kt_, int c)
00695 {
00696     Assert(writable);
00697     int dir_end = DIR_END(p);
00698     int kt_len = kt_.size();
00699     int needed = kt_len + D2;
00700     int new_total = TOTAL_FREE(p) - needed;
00701     int new_max = MAX_FREE(p) - needed;
00702 
00703     Assert(new_total >= 0);
00704 
00705     if (new_max < 0) {
00706         compact(p);
00707         new_max = MAX_FREE(p) - needed;
00708         Assert(new_max >= 0);
00709     }
00710     Assert(dir_end >= c);
00711 
00712     memmove(p + c + D2, p + c, dir_end - c);
00713     dir_end += D2;
00714     SET_DIR_END(p, dir_end);
00715 
00716     int o = dir_end + new_max;
00717     setD(p, c, o);
00718     memmove(p + o, kt_.get_address(), kt_len);
00719 
00720     SET_MAX_FREE(p, new_max);
00721 
00722     SET_TOTAL_FREE(p, new_total);
00723 }
00724 
00730 void
00731 FlintTable::add_item(Item_wr_ kt_, int j)
00732 {
00733     Assert(writable);
00734     byte * p = C[j].p;
00735     int c = C[j].c;
00736     uint4 n;
00737 
00738     int needed = kt_.size() + D2;
00739     if (TOTAL_FREE(p) < needed) {
00740         int m;
00741         // Prepare to split p. After splitting, the block is in two halves, the
00742         // lower half is split_p, the upper half p again. add_to_upper_half
00743         // becomes true when the item gets added to p, false when it gets added
00744         // to split_p.
00745 
00746         if (seq_count < 0) {
00747             // If we're not in sequential mode, we split at the mid point
00748             // of the node.
00749             m = mid_point(p);
00750         } else {
00751             // During sequential addition, split at the insert point
00752             m = c;
00753         }
00754 
00755         uint4 split_n = C[j].n;
00756         C[j].n = base.next_free_block();
00757 
00758         memcpy(split_p, p, block_size);  // replicate the whole block in split_p
00759         SET_DIR_END(split_p, m);
00760         compact(split_p);      /* to reset TOTAL_FREE, MAX_FREE */
00761 
00762         {
00763             int residue = DIR_END(p) - m;
00764             int new_dir_end = DIR_START + residue;
00765             memmove(p + DIR_START, p + m, residue);
00766             SET_DIR_END(p, new_dir_end);
00767         }
00768 
00769         compact(p);      /* to reset TOTAL_FREE, MAX_FREE */
00770 
00771         bool add_to_upper_half;
00772         if (seq_count < 0) {
00773             add_to_upper_half = (c >= m);
00774         } else {
00775             // And add item to lower half if split_p has room, otherwise upper
00776             // half
00777             add_to_upper_half = (TOTAL_FREE(split_p) < needed);
00778         }
00779 
00780         if (add_to_upper_half) {
00781             c -= (m - DIR_START);
00782             Assert(seq_count < 0 || c <= DIR_START + D2);
00783             Assert(c >= DIR_START);
00784             Assert(c <= DIR_END(p));
00785             add_item_to_block(p, kt_, c);
00786             n = C[j].n;
00787         } else {
00788             Assert(c >= DIR_START);
00789             Assert(c <= DIR_END(split_p));
00790             add_item_to_block(split_p, kt_, c);
00791             n = split_n;
00792         }
00793         write_block(split_n, split_p);
00794 
00795         // Check if we're splitting the root block.
00796         if (j == level) split_root(split_n);
00797 
00798         /* Enter a separating key at level j + 1 between */
00799         /* the last key of block split_p, and the first key of block p */
00800         enter_key(j + 1,
00801                   Item_(split_p, DIR_END(split_p) - D2).key(),
00802                   Item_(p, DIR_START).key());
00803     } else {
00804         add_item_to_block(p, kt_, c);
00805         n = C[j].n;
00806     }
00807     if (j == 0) {
00808         changed_n = n;
00809         changed_c = c;
00810     }
00811 }
00812 
00820 void
00821 FlintTable::delete_item(int j, bool repeatedly)
00822 {
00823     Assert(writable);
00824     byte * p = C[j].p;
00825     int c = C[j].c;
00826     int kt_len = Item_(p, c).size(); /* size of the item to be deleted */
00827     int dir_end = DIR_END(p) - D2;   /* directory length will go down by 2 bytes */
00828 
00829     memmove(p + c, p + c + D2, dir_end - c);
00830     SET_DIR_END(p, dir_end);
00831     SET_MAX_FREE(p, MAX_FREE(p) + D2);
00832     SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
00833 
00834     if (!repeatedly) return;
00835     if (j < level) {
00836         if (dir_end == DIR_START) {
00837             base.free_block(C[j].n);
00838             C[j].rewrite = false;
00839             C[j].n = BLK_UNUSED;
00840             C[j + 1].rewrite = true;  /* *is* necessary */
00841             delete_item(j + 1, true);
00842         }
00843     } else {
00844         Assert(j == level);
00845         while (dir_end == DIR_START + D2 && level > 0) {
00846             /* single item in the root block, so lose a level */
00847             uint4 new_root = Item_(p, DIR_START).block_given_by();
00848             delete [] p;
00849             C[level].p = 0;
00850             base.free_block(C[level].n);
00851             C[level].rewrite = false;
00852             C[level].n = BLK_UNUSED;
00853             level--;
00854 
00855             block_to_cursor(C, level, new_root);
00856 
00857             p = C[level].p;
00858             dir_end = DIR_END(p); /* prepare for the loop */
00859         }
00860     }
00861 }
00862 
00863 /* debugging aid:
00864 static addcount = 0;
00865 */
00866 
00892 int
00893 FlintTable::add_kt(bool found)
00894 {
00895     Assert(writable);
00896     int components = 0;
00897 
00898     /*
00899     {
00900         printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
00901         print_bytes(kt[I2] - K1 - C2, kt + I2 + K1); putchar('\n');
00902     }
00903     */
00904     alter();
00905 
00906     if (found) { /* replacement */
00907         seq_count = SEQ_START_POINT;
00908         sequential = false;
00909 
00910         byte * p = C[0].p;
00911         int c = C[0].c;
00912         Item_ item(p, c);
00913         int kt_size = kt.size();
00914         int needed = kt_size - item.size();
00915 
00916         components = Item_(p, c).components_of();
00917 
00918         if (needed <= 0) {
00919             /* simple replacement */
00920             memmove(const_cast<byte *>(item.get_address()),
00921                     kt.get_address(), kt_size);
00922             SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
00923         } else {
00924             /* new item into the block's freespace */
00925             int new_max = MAX_FREE(p) - kt_size;
00926             if (new_max >= 0) {
00927                 int o = DIR_END(p) + new_max;
00928                 memmove(p + o, kt.get_address(), kt_size);
00929                 setD(p, c, o);
00930                 SET_MAX_FREE(p, new_max);
00931                 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
00932             } else {
00933                 /* do it the long way */
00934                 delete_item(0, false);
00935                 add_item(kt, 0);
00936             }
00937         }
00938     } else {
00939         /* addition */
00940         if (changed_n == C[0].n && changed_c == C[0].c) {
00941             if (seq_count < 0) seq_count++;
00942         } else {
00943             seq_count = SEQ_START_POINT;
00944             sequential = false;
00945         }
00946         C[0].c += D2;
00947         add_item(kt, 0);
00948     }
00949     return components;
00950 }
00951 
00952 /* delete_kt() corresponds to add_kt(found), but there are only
00953    two cases: if the key is not found nothing is done, and if it is
00954    found the corresponding item is deleted with delete_item.
00955 */
00956 
00957 int
00958 FlintTable::delete_kt()
00959 {
00960     Assert(writable);
00961 
00962     bool found = find(C);
00963 
00964     int components = 0;
00965     seq_count = SEQ_START_POINT;
00966     sequential = false;
00967 
00968     /*
00969     {
00970         printf("%d) %s ", addcount++, (found ? "deleting " : "ignoring "));
00971         print_bytes(B->kt[I2] - K1 - C2, B->kt + I2 + K1); putchar('\n');
00972     }
00973     */
00974     if (found) {
00975         components = Item_(C[0].p, C[0].c).components_of();
00976         alter();
00977         delete_item(0, true);
00978     }
00979     return components;
00980 }
00981 
00982 /* FlintTable::form_key(key) treats address kt as an item holder and fills in
00983 the key part:
00984 
00985            (I) K key c (C tag)
00986 
00987 The bracketed parts are left blank. The key is filled in with key_len bytes and
00988 K set accordingly. c is set to 1.
00989 */
00990 
00991 void FlintTable::form_key(const string & key) const
00992 {
00993     kt.form_key(key);
00994 }
00995 
00996 /* FlintTable::add(key, tag) adds the key/tag item to the
00997    B-tree, replacing any existing item with the same key.
00998 
00999    For a long tag, we end up having to add m components, of the form
01000 
01001        key 1 m tag1
01002        key 2 m tag2
01003        ...
01004        key m m tagm
01005 
01006    and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
01007    n components of the form
01008 
01009        key 1 n TAG1
01010        key 2 n TAG2
01011        ...
01012        key n n TAGn
01013 
01014    and n may be greater than, equal to, or less than m. These cases are dealt
01015    with in the code below. If m < n for example, we end up with a series of
01016    deletions.
01017 */
01018 
01019 bool
01020 FlintTable::add(const string &key, string tag, bool already_compressed)
01021 {
01022     DEBUGCALL(DB, bool, "FlintTable::add", key << ", " << tag);
01023     Assert(writable);
01024 
01025     if (handle < 0) create_and_open(block_size);
01026 
01027     form_key(key);
01028 
01029     bool compressed = false;
01030     if (already_compressed) {
01031         compressed = true;
01032     } else if (compress_strategy != DONT_COMPRESS && tag.size() > COMPRESS_MIN) {
01033         CompileTimeAssert(DONT_COMPRESS != Z_DEFAULT_STRATEGY);
01034         CompileTimeAssert(DONT_COMPRESS != Z_FILTERED);
01035         CompileTimeAssert(DONT_COMPRESS != Z_HUFFMAN_ONLY);
01036 #ifdef Z_RLE
01037         CompileTimeAssert(DONT_COMPRESS != Z_RLE);
01038 #endif
01039 
01040         z_stream stream;
01041 
01042         stream.zalloc = reinterpret_cast<alloc_func>(0);
01043         stream.zfree = reinterpret_cast<free_func>(0);
01044         stream.opaque = (voidpf)0;
01045 
01046         // -15 means raw deflate with 32K LZ77 window (largest)
01047         // memLevel 9 is the highest (8 is default)
01048         int err;
01049         err = deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 9,
01050                            compress_strategy);
01051         if (err != Z_OK) {
01052             if (err == Z_MEM_ERROR) throw std::bad_alloc();
01053             string msg = "deflateInit2 failed";
01054             if (stream.msg) {
01055                 msg += " (";
01056                 msg += stream.msg;
01057                 msg += ')';
01058             }
01059             throw Xapian::DatabaseError(msg);
01060         }
01061 
01062         stream.next_in = (Bytef *)const_cast<char *>(tag.data());
01063         stream.avail_in = (uInt)tag.size();
01064 
01065         // If compressed size is >= tag.size(), we don't want to compress.
01066         unsigned long blk_len = tag.size() - 1;
01067         unsigned char * blk = new unsigned char[blk_len];
01068         stream.next_out = blk;
01069         stream.avail_out = (uInt)blk_len;
01070 
01071         err = deflate(&stream, Z_FINISH);
01072         if (err == Z_STREAM_END) {
01073             // If deflate succeeded, then the output was at least one byte
01074             // smaller than the input.
01075             tag.assign(reinterpret_cast<const char *>(blk), stream.total_out);
01076             compressed = true;
01077             err = deflateEnd(&stream);
01078             if (err != Z_OK) {
01079                 string msg = "deflateEnd failed";
01080                 if (stream.msg) {
01081                     msg += " (";
01082                     msg += stream.msg;
01083                     msg += ')';
01084                 }
01085                 throw Xapian::DatabaseError(msg);
01086             }
01087         } else {
01088             // Deflate failed - presumably the data wasn't compressible.
01089             (void)deflateEnd(&stream);
01090         }
01091 
01092         delete [] blk;
01093     }
01094 
01095     // sort of matching kt.append_chunk(), but setting the chunk
01096     const size_t cd = kt.key().length() + K1 + I2 + C2 + C2;  // offset to the tag data
01097     const size_t L = max_item_size - cd; // largest amount of tag data for any chunk
01098     size_t first_L = L;                  // - amount for tag1
01099     bool found = find(C);
01100     if (!found) {
01101         byte * p = C[0].p;
01102         size_t n = TOTAL_FREE(p) % (max_item_size + D2);
01103         if (n > D2 + cd) {
01104             n -= (D2 + cd);
01105             // if n >= last then fully filling this block won't produce
01106             // an extra item, so we might as well do this even if
01107             // full_compaction isn't active.
01108             //
01109             // In the full_compaction case, it turns out we shouldn't always
01110             // try to fill every last byte.  Doing so can actually increase the
01111             // total space required (I believe this effect is due to longer
01112             // dividing keys being required in the index blocks).  Empirically,
01113             // n >= key.size() + K appears a good criterion for K ~= 34.  This
01114             // seems to save about 0.2% in total database size over always
01115             // splitting the tag.  It'll also give be slightly faster retrieval
01116             // as we can avoid reading an extra block occasionally.
01117             size_t last = tag.length() % L;
01118             if (n >= last || (full_compaction && n >= key.size() + 34))
01119                 first_L = n;
01120         }
01121     }
01122 
01123     // a null tag must be added in of course
01124     int m = tag.empty() ? 1 : (tag.length() - first_L + L - 1) / L + 1;
01125                                       // there are m items to add
01126     /* FIXME: sort out this error higher up and turn this into
01127      * an assert.
01128      */
01129     if (m >= BYTE_PAIR_RANGE) RETURN(false);
01130 
01131     int n = 0; // initialise to shut off warning
01132                                       // - and there will be n to delete
01133     int o = 0;                        // Offset into the tag
01134     size_t residue = tag.length();    // Bytes of the tag remaining to add in
01135     int replacement = false;          // Has there been a replacement ?
01136     int i;
01137     kt.set_components_of(m);
01138     for (i = 1; i <= m; i++) {
01139         size_t l = (i == m ? residue : (i == 1 ? first_L : L));
01140         Assert(cd + l <= block_size);
01141         Assert(string::size_type(o + l) <= tag.length());
01142         kt.set_tag(cd, tag.data() + o, l, compressed);
01143         kt.set_component_of(i);
01144 
01145         o += l;
01146         residue -= l;
01147 
01148         if (i > 1) found = find(C);
01149         n = add_kt(found);
01150         if (n > 0) replacement = true;
01151     }
01152     /* o == tag.length() here, and n may be zero */
01153     for (i = m + 1; i <= n; i++) {
01154         kt.set_component_of(i);
01155         delete_kt();
01156     }
01157     if (!replacement) ++item_count;
01158     Btree_modified = true;
01159     RETURN(true);
01160 }
01161 
01162 /* FlintTable::del(key) returns false if the key is not in the B-tree,
01163    otherwise deletes it and returns true.
01164 
01165    Again, this is parallel to FlintTable::add, but simpler in form.
01166 */
01167 
01168 bool
01169 FlintTable::del(const string &key)
01170 {
01171     DEBUGCALL(DB, bool, "FlintTable::del", key);
01172     Assert(writable);
01173 
01174     if (handle < 0) RETURN(false);
01175 
01176     // We can't delete a key which we is too long for us to store.
01177     if (key.size() > FLINT_BTREE_MAX_KEY_LEN) RETURN(false);
01178 
01179     if (key.empty()) RETURN(false);
01180     form_key(key);
01181 
01182     int n = delete_kt();  /* there are n items to delete */
01183     if (n <= 0) RETURN(false);
01184 
01185     for (int i = 2; i <= n; i++) {
01186         kt.set_component_of(i);
01187         delete_kt();
01188     }
01189 
01190     item_count--;
01191     Btree_modified = true;
01192     RETURN(true);
01193 }
01194 
01195 bool
01196 FlintTable::get_exact_entry(const string &key, string & tag) const
01197 {
01198     DEBUGCALL(DB, bool, "FlintTable::get_exact_entry", key << ", " << tag);
01199     Assert(!key.empty());
01200 
01201     if (handle < 0) RETURN(false);
01202 
01203     // An oversized key can't exist, so attempting to search for it should fail.
01204     if (key.size() > FLINT_BTREE_MAX_KEY_LEN) RETURN(false);
01205 
01206     RETURN(find_tag(key, &tag));
01207 }
01208 
01209 bool
01210 FlintTable::key_exists(const string &key) const
01211 {
01212     DEBUGCALL(DB, bool, "FlintTable::key_exists", key);
01213     Assert(!key.empty());
01214 
01215     // An oversized key can't exist, so attempting to search for it should fail.
01216     if (key.size() > FLINT_BTREE_MAX_KEY_LEN) RETURN(false);
01217 
01218     form_key(key);
01219     RETURN(find(C));
01220 }
01221 
01222 bool
01223 FlintTable::find_tag(const string &key, string * tag) const
01224 {
01225     DEBUGCALL(DB, bool, "FlintTable::find_tag", key << ", &tag");
01226     if (handle == -1) RETURN(false);
01227 
01228     // An oversized key can't exist, so attempting to search for it should fail.
01229     if (key.size() > FLINT_BTREE_MAX_KEY_LEN) RETURN(false);
01230 
01231     form_key(key);
01232     if (!find(C)) RETURN(false);
01233 
01234     (void)read_tag(C, tag, false);
01235     RETURN(true);
01236 }
01237 
01238 bool
01239 FlintTable::read_tag(Cursor_ * C_, string *tag, bool keep_compressed) const
01240 {
01241     Item_ item(C_[0].p, C_[0].c);
01242 
01243     /* n components to join */
01244     int n = item.components_of();
01245 
01246     tag->resize(0);
01247     // max_item_size also includes K1 + I2 + C2 + C2 bytes overhead and the key
01248     // (which is at least 1 byte long).
01249     if (n > 1) tag->reserve((max_item_size - (1 + K1 + I2 + C2 + C2)) * n);
01250 
01251     item.append_chunk(tag);
01252     bool compressed = item.get_compressed();
01253 
01254     for (int i = 2; i <= n; i++) {
01255         if (!next(C_, 0)) {
01256             throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
01257         }
01258         (void)Item_(C_[0].p, C_[0].c).append_chunk(tag);
01259     }
01260     // At this point the cursor is on the last item - calling next will move
01261     // it to the next key (FlintCursor::get_tag() relies on this).
01262     if (!compressed || keep_compressed) return compressed;
01263 
01264     // FIXME: Perhaps we should we decompress each chunk as we read it so we
01265     // don't need both the full compressed and uncompressed tags in memory
01266     // at once.
01267 
01268     string utag;
01269     // May not be enough for a compressed tag, but it's a reasonable guess.
01270     utag.reserve(tag->size() + tag->size() / 2);
01271 
01272     Bytef buf[8192];
01273 
01274     z_stream stream;
01275     stream.next_out = buf;
01276     stream.avail_out = (uInt)sizeof(buf);
01277 
01278     stream.zalloc = reinterpret_cast<alloc_func>(0);
01279     stream.zfree = reinterpret_cast<free_func>(0);
01280 
01281     stream.next_in = Z_NULL;
01282     stream.avail_in = 0;
01283 
01284     int err = inflateInit2(&stream, -15);
01285     if (err != Z_OK) {
01286         if (err == Z_MEM_ERROR) throw std::bad_alloc();
01287         string msg = "inflateInit2 failed";
01288         if (stream.msg) {
01289             msg += " (";
01290             msg += stream.msg;
01291             msg += ')';
01292         }
01293         throw Xapian::DatabaseError(msg);
01294     }
01295 
01296     stream.next_in = (Bytef*)const_cast<char *>(tag->data());
01297     stream.avail_in = (uInt)tag->size();
01298 
01299     while (err != Z_STREAM_END) {
01300         stream.next_out = buf;
01301         stream.avail_out = (uInt)sizeof(buf);
01302         err = inflate(&stream, Z_SYNC_FLUSH);
01303         if (err == Z_BUF_ERROR && stream.avail_in == 0) {
01304             DEBUGLINE(DB, "Z_BUF_ERROR - faking checksum of " << stream.adler);
01305             Bytef header2[4];
01306             setint4(header2, 0, stream.adler);
01307             stream.next_in = header2;
01308             stream.avail_in = 4;
01309             err = inflate(&stream, Z_SYNC_FLUSH);
01310             if (err == Z_STREAM_END) break;
01311         }
01312 
01313         if (err != Z_OK && err != Z_STREAM_END) {
01314             if (err == Z_MEM_ERROR) throw std::bad_alloc();
01315             string msg = "inflate failed";
01316             if (stream.msg) {
01317                 msg += " (";
01318                 msg += stream.msg;
01319                 msg += ')';
01320             }
01321             throw Xapian::DatabaseError(msg);
01322         }
01323 
01324         utag.append(reinterpret_cast<const char *>(buf),
01325                     stream.next_out - buf);
01326     }
01327     if (utag.size() != stream.total_out) {
01328         string msg = "compressed tag didn't expand to the expected size: ";
01329         msg += om_tostring(utag.size());
01330         msg += " != ";
01331         // OpenBSD's zlib.h uses off_t instead of uLong for total_out.
01332         msg += om_tostring((size_t)stream.total_out);
01333         throw Xapian::DatabaseCorruptError(msg);
01334     }
01335 
01336     err = inflateEnd(&stream);
01337     if (err != Z_OK) abort();
01338 
01339     swap(*tag, utag);
01340 
01341     return false;
01342 }
01343 
01344 void
01345 FlintTable::set_full_compaction(bool parity)
01346 {
01347     Assert(writable);
01348 
01349     if (parity) seq_count = 0;
01350     full_compaction = parity;
01351 }
01352 
01353 FlintCursor * FlintTable::cursor_get() const {
01354     if (handle < 0) return NULL;
01355     // FIXME Ick - casting away const is nasty
01356     return new FlintCursor(const_cast<FlintTable *>(this));
01357 }
01358 
01359 /************ B-tree opening and closing ************/
01360 
01361 bool
01362 FlintTable::basic_open(bool revision_supplied, flint_revision_number_t revision_)
01363 {
01364     int ch = 'X'; /* will be 'A' or 'B' */
01365 
01366     {
01367         const size_t BTREE_BASES = 2;
01368         string err_msg;
01369         static const char basenames[BTREE_BASES] = { 'A', 'B' };
01370 
01371         FlintTable_base bases[BTREE_BASES];
01372         bool base_ok[BTREE_BASES];
01373 
01374         both_bases = true;
01375         bool valid_base = false;
01376         {
01377             for (size_t i = 0; i < BTREE_BASES; ++i) {
01378                 bool ok = bases[i].read(name, basenames[i], err_msg);
01379                 base_ok[i] = ok;
01380                 if (ok) {
01381                     valid_base = true;
01382                 } else {
01383                     both_bases = false;
01384                 }
01385             }
01386         }
01387 
01388         if (!valid_base) {
01389             if (handle >= 0) {
01390                 ::close(handle);
01391                 handle = -1;
01392             }
01393             string message = "Error opening table `";
01394             message += name;
01395             message += "':\n";
01396             message += err_msg;
01397             throw Xapian::DatabaseOpeningError(message);
01398         }
01399 
01400         if (revision_supplied) {
01401             bool found_revision = false;
01402             for (size_t i = 0; i < BTREE_BASES; ++i) {
01403                 if (base_ok[i] && bases[i].get_revision() == revision_) {
01404                     ch = basenames[i];
01405                     found_revision = true;
01406                     break;
01407                 }
01408             }
01409             if (!found_revision) {
01410                 /* Couldn't open the revision that was asked for.
01411                  * This shouldn't throw an exception, but should just return
01412                  * false to upper levels.
01413                  */
01414                 return false;
01415             }
01416         } else {
01417             flint_revision_number_t highest_revision = 0;
01418             for (size_t i = 0; i < BTREE_BASES; ++i) {
01419                 if (base_ok[i] && bases[i].get_revision() >= highest_revision) {
01420                     ch = basenames[i];
01421                     highest_revision = bases[i].get_revision();
01422                 }
01423             }
01424         }
01425 
01426         FlintTable_base *basep = 0;
01427         FlintTable_base *other_base = 0;
01428 
01429         for (size_t i = 0; i < BTREE_BASES; ++i) {
01430             DEBUGLINE(UNKNOWN, "Checking (ch == " << ch << ") against "
01431                       "basenames[" << i << "] == " << basenames[i]);
01432             DEBUGLINE(UNKNOWN, "bases[" << i << "].get_revision() == " <<
01433                       bases[i].get_revision());
01434             DEBUGLINE(UNKNOWN, "base_ok[" << i << "] == " << base_ok[i]);
01435             if (ch == basenames[i]) {
01436                 basep = &bases[i];
01437 
01438                 // FIXME: assuming only two bases for other_base
01439                 size_t otherbase_num = 1-i;
01440                 if (base_ok[otherbase_num]) {
01441                     other_base = &bases[otherbase_num];
01442                 }
01443                 break;
01444             }
01445         }
01446         Assert(basep);
01447 
01448         /* basep now points to the most recent base block */
01449 
01450         /* Avoid copying the bitmap etc. - swap contents with the base
01451          * object in the vector, since it'll be destroyed anyway soon.
01452          */
01453         base.swap(*basep);
01454 
01455         revision_number =  base.get_revision();
01456         block_size =       base.get_block_size();
01457         root =             base.get_root();
01458         level =            base.get_level();
01459         //bit_map_size =     basep->get_bit_map_size();
01460         item_count =       base.get_item_count();
01461         faked_root_block = base.get_have_fakeroot();
01462         sequential =       base.get_sequential();
01463 
01464         if (other_base != 0) {
01465             latest_revision_number = other_base->get_revision();
01466             if (revision_number > latest_revision_number)
01467                 latest_revision_number = revision_number;
01468         } else {
01469             latest_revision_number = revision_number;
01470         }
01471     }
01472 
01473     /* kt holds constructed items as well as keys */
01474     kt = Item_wr_(zeroed_new(block_size));
01475 
01476     set_max_item_size(BLOCK_CAPACITY);
01477 
01478     base_letter = ch;
01479 
01480     /* ready to open the main file */
01481 
01482     return true;
01483 }
01484 
01485 void
01486 FlintTable::read_root()
01487 {
01488     if (faked_root_block) {
01489         /* root block for an unmodified database. */
01490         byte * p = C[0].p;
01491         Assert(p);
01492 
01493         /* clear block - shouldn't be neccessary, but is a bit nicer,
01494          * and means that the same operations should always produce
01495          * the same database. */
01496         memset(p, 0, block_size);
01497 
01498         int o = block_size - I2 - K1 - C2 - C2;
01499         Item_wr_(p + o).fake_root_item();
01500 
01501         setD(p, DIR_START, o);         // its directory entry
01502         SET_DIR_END(p, DIR_START + D2);// the directory size
01503 
01504         o -= (DIR_START + D2);
01505         SET_MAX_FREE(p, o);
01506         SET_TOTAL_FREE(p, o);
01507         SET_LEVEL(p, 0);
01508 
01509         if (!writable) {
01510             /* reading - revision number doesn't matter as long as
01511              * it's not greater than the current one. */
01512             SET_REVISION(p, 0);
01513             C[0].n = 0;
01514         } else {
01515             /* writing - */
01516             SET_REVISION(p, latest_revision_number + 1);
01517             C[0].n = base.next_free_block();
01518         }
01519     } else {
01520         /* using a root block stored on disk */
01521         block_to_cursor(C, level, root);
01522 
01523         if (REVISION(C[level].p) > revision_number) set_overwritten();
01524         /* although this is unlikely */
01525     }
01526 }
01527 
01528 bool
01529 FlintTable::do_open_to_write(bool revision_supplied,
01530                              flint_revision_number_t revision_,
01531                              bool create_db)
01532 {
01533     if (handle == -2) {
01534         throw Xapian::DatabaseError("Database has been closed");
01535     }
01536     int flags = O_RDWR | O_BINARY;
01537     if (create_db) flags |= O_CREAT | O_TRUNC;
01538     handle = ::open((name + "DB").c_str(), flags, 0666);
01539     if (handle < 0) {
01540         // lazy doesn't make a lot of sense with create_db anyway, but ENOENT
01541         // with O_CREAT means a parent directory doesn't exist.
01542         if (lazy && !create_db && errno == ENOENT) {
01543             revision_number = revision_;
01544             return true;
01545         }
01546         string message(create_db ? "Couldn't create " : "Couldn't open ");
01547         message += name;
01548         message += "DB read/write: ";
01549         message += strerror(errno);
01550         throw Xapian::DatabaseOpeningError(message);
01551     }
01552 
01553     if (!basic_open(revision_supplied, revision_)) {
01554 	::close(handle);
01555         handle = -1;
01556         if (!revision_supplied) {
01557             throw Xapian::DatabaseOpeningError("Failed to open for writing");
01558         }
01559         /* When the revision is supplied, it's not an exceptional
01560          * case when open failed, so we just return false here.
01561          */
01562         return false;
01563     }
01564 
01565     writable = true;
01566 
01567     for (int j = 0; j <= level; j++) {
01568         C[j].n = BLK_UNUSED;
01569         C[j].p = new byte[block_size];
01570         if (C[j].p == 0) {
01571             throw std::bad_alloc();
01572         }
01573     }
01574     split_p = new byte[block_size];
01575     if (split_p == 0) {
01576         throw std::bad_alloc();
01577     }
01578     read_root();
01579 
01580     buffer = zeroed_new(block_size);
01581 
01582     changed_n = 0;
01583     changed_c = DIR_START;
01584     seq_count = SEQ_START_POINT;
01585 
01586     return true;
01587 }
01588 
01589 FlintTable::FlintTable(string path_, bool readonly_,
01590                        int compress_strategy_, bool lazy_)
01591         : revision_number(0),
01592           item_count(0),
01593           block_size(0),
01594           latest_revision_number(0),
01595           both_bases(false),
01596           base_letter('A'),
01597           faked_root_block(true),
01598           sequential(true),
01599           handle(-1),
01600           level(0),
01601           root(0),
01602           kt(0),
01603           buffer(0),
01604           base(),
01605           name(path_),
01606           seq_count(0),
01607           changed_n(0),
01608           changed_c(0),
01609           max_item_size(0),
01610           Btree_modified(false),
01611           full_compaction(false),
01612           writable(!readonly_),
01613           split_p(0),
01614           compress_strategy(compress_strategy_),
01615           lazy(lazy_)
01616 {
01617     DEBUGCALL(DB, void, "FlintTable::FlintTable", 
01618               tablename_ << "," << path_ << ", " << readonly_ << ", " <<
01619               compress_strategy_ << ", " << lazy_);
01620 }
01621 
01622 bool
01623 FlintTable::exists() const {
01624     DEBUGCALL(DB, bool, "FlintTable::exists", "");
01625     return (file_exists(name + "DB") &&
01626             (file_exists(name + "baseA") || file_exists(name + "baseB")));
01627 }
01628 
01632 static void
01633 sys_unlink_if_exists(const string & filename)
01634 {
01635     if (unlink(filename) == -1) {
01636         if (errno == ENOENT) return;
01637         throw Xapian::DatabaseError("Can't delete file: `" + filename +
01638                               "': " + strerror(errno));
01639     }
01640 }
01641 
01642 void
01643 FlintTable::erase()
01644 {
01645     DEBUGCALL(DB, void, "FlintTable::erase", "");
01646     close();
01647 
01648     sys_unlink_if_exists(name + "baseA");
01649     sys_unlink_if_exists(name + "baseB");
01650     sys_unlink_if_exists(name + "DB");
01651 }
01652 
01653 void
01654 FlintTable::set_block_size(unsigned int block_size_)
01655 {
01656     DEBUGCALL(DB, void, "FlintTable::set_block_size", block_size_);
01657     // Block size must in the range 2048..BYTE_PAIR_RANGE, and a power of two.
01658     if (block_size_ < 2048 || block_size_ > BYTE_PAIR_RANGE ||
01659         (block_size_ & (block_size_ - 1)) != 0) {
01660         block_size_ = FLINT_DEFAULT_BLOCK_SIZE;
01661     }
01662     block_size = block_size_;
01663 }
01664 
01665 void
01666 FlintTable::create_and_open(unsigned int block_size_)
01667 {
01668     DEBUGCALL(DB, void, "FlintTable::create_and_open", block_size_);
01669     if (handle == -2) {
01670         throw Xapian::DatabaseError("Database has been closed");
01671     }
01672     Assert(writable);
01673     close();
01674 
01675     if (block_size_ == 0) abort();
01676     set_block_size(block_size_);
01677 
01678     // FIXME: it would be good to arrange that this works such that there's
01679     // always a valid table in place if you run create_and_open() on an
01680     // existing table.
01681 
01682     /* write initial values to files */
01683 
01684     /* create the base file */
01685     FlintTable_base base_;
01686     base_.set_revision(revision_number);
01687     base_.set_block_size(block_size_);
01688     base_.set_have_fakeroot(true);
01689     base_.set_sequential(true);
01690     base_.write_to_file(name + "baseA");
01691 
01692     /* remove the alternative base file, if any */
01693     sys_unlink_if_exists(name + "baseB");
01694 
01695     // Any errors are thrown if revision_supplied is false.
01696     (void)do_open_to_write(false, 0, true);
01697 }
01698 
01699 FlintTable::~FlintTable() {
01700     DEBUGCALL(DB, void, "FlintTable::~FlintTable", "");
01701     FlintTable::close();
01702 }
01703 
01704 void FlintTable::close(bool permanent) {
01705     DEBUGCALL(DB, void, "FlintTable::close", "");
01706 
01707     if (handle >= 0) {
01708         // If an error occurs here, we just ignore it, since we're just
01709         // trying to free everything.
01710         (void)::close(handle);
01711         handle = -1;
01712     }
01713 
01714     if (permanent) {
01715         handle = -2;
01716     }
01717 
01718     for (int j = level; j >= 0; j--) {
01719         delete [] C[j].p;
01720         C[j].p = 0;
01721     }
01722     delete [] split_p;
01723     split_p = 0;
01724 
01725     delete [] kt.get_address();
01726     kt = 0;
01727     delete [] buffer;
01728     buffer = 0;
01729 }
01730 
01731 void
01732 FlintTable::commit(flint_revision_number_t revision)
01733 {
01734     DEBUGCALL(DB, void, "FlintTable::commit", revision);
01735     Assert(writable);
01736 
01737     if (revision <= revision_number) {
01738         throw Xapian::DatabaseError("New revision too low");
01739     }
01740 
01741     if (handle < 0) {
01742         latest_revision_number = revision_number = revision;
01743         return;
01744     }
01745 
01746     // FIXME: this doesn't work (probably because the table revisions get
01747     // out of step) but it's wasteful to keep applying changes to value
01748     // and position if they're never used...
01749     //
01750     // if (!Btree_modified) return;
01751 
01752     for (int j = level; j >= 0; j--) {
01753         if (C[j].rewrite) {
01754             write_block(C[j].n, C[j].p);
01755         }
01756     }
01757 
01758     if (Btree_modified) {
01759         faked_root_block = false;
01760     }
01761 
01762     try {
01763         if (faked_root_block) {
01764             /* We will use a dummy bitmap. */
01765             base.clear_bit_map();
01766         }
01767 
01768         base.set_revision(revision);
01769         base.set_root(C[level].n);
01770         base.set_level(level);
01771         base.set_item_count(item_count);
01772         base.set_have_fakeroot(faked_root_block);
01773         base.set_sequential(sequential);
01774 
01775         base_letter = other_base_letter();
01776 
01777         both_bases = true;
01778         latest_revision_number = revision_number = revision;
01779         root = C[level].n;
01780 
01781         Btree_modified = false;
01782 
01783         for (int i = 0; i < BTREE_CURSOR_LEVELS; ++i) {
01784             C[i].n = BLK_UNUSED;
01785             C[i].c = -1;
01786             C[i].rewrite = false;
01787         }
01788 
01789         if (!flint_io_sync(handle)) {
01790             (void)::close(handle);
01791             handle = -1;
01792             throw Xapian::DatabaseError("Can't commit new revision - failed to flush DB to disk");
01793         }
01794 
01795         // Save to "<table>.tmp" and then rename to "<table>.base<letter>" so
01796         // that a reader can't try to read a partially written base file.
01797         string tmp = name;
01798         tmp += "tmp";
01799         string basefile = name;
01800         basefile += "base";
01801         basefile += char(base_letter);
01802         base.write_to_file(tmp);
01803 #if defined __WIN32__
01804         if (msvc_posix_rename(tmp.c_str(), basefile.c_str()) < 0) {
01805 #else
01806         if (rename(tmp.c_str(), basefile.c_str()) < 0) {
01807 #endif
01808             // With NFS, rename() failing may just mean that the server crashed
01809             // after successfully renaming, but before reporting this, and then
01810             // the retried operation fails.  So we need to check if the source
01811             // file still exists, which we do by calling unlink(), since we want
01812             // to remove the temporary file anyway.
01813             int saved_errno = errno;
01814             if (unlink(tmp) == 0 || errno != ENOENT) {
01815                 string msg("Couldn't update base file ");
01816                 msg += basefile;
01817                 msg += ": ";
01818                 msg += strerror(saved_errno);
01819                 throw Xapian::DatabaseError(msg);
01820             }
01821         }
01822         base.commit();
01823 
01824         read_root();
01825 
01826         changed_n = 0;
01827         changed_c = DIR_START;
01828         seq_count = SEQ_START_POINT;
01829     } catch(...) {
01830         FlintTable::close();
01831         throw;
01832     }
01833 }
01834 
01835 void
01836 FlintTable::cancel()
01837 {
01838     DEBUGCALL(DB, void, "FlintTable::cancel", "");
01839     Assert(writable);
01840 
01841     if (handle < 0) {
01842         latest_revision_number = revision_number; // FIXME: we can end up reusing a revision if we opened a btree at an older revision, start to modify it, then cancel...
01843         return;
01844     }
01845 
01846     // This causes problems: if (!Btree_modified) return;
01847 
01848     string err_msg;
01849     if (!base.read(name, base_letter, err_msg)) {
01850         throw Xapian::DatabaseCorruptError(string("Couldn't reread base ") + base_letter);
01851     }
01852 
01853     revision_number =  base.get_revision();
01854     block_size =       base.get_block_size();
01855     root =             base.get_root();
01856     level =            base.get_level();
01857     //bit_map_size =     basep->get_bit_map_size();
01858     item_count =       base.get_item_count();
01859     faked_root_block = base.get_have_fakeroot();
01860     sequential =       base.get_sequential();
01861 
01862     latest_revision_number = revision_number; // FIXME: we can end up reusing a revision if we opened a btree at an older revision, start to modify it, then cancel...
01863 
01864     for (int j = 0; j <= level; j++) {
01865         C[j].n = BLK_UNUSED;
01866         C[j].rewrite = false;
01867     }
01868     read_root();
01869 
01870     changed_n = 0;
01871     changed_c = DIR_START;
01872     seq_count = SEQ_START_POINT;
01873 }
01874 
01875 /************ B-tree reading ************/
01876 
01877 bool
01878 FlintTable::do_open_to_read(bool revision_supplied, flint_revision_number_t revision_)
01879 {
01880     if (handle == -2) {
01881         throw Xapian::DatabaseError("Database has been closed");
01882     }
01883     handle = ::open((name + "DB").c_str(), O_RDONLY | O_BINARY);
01884     if (handle < 0) {
01885         if (lazy) {
01886             // This table is optional when reading!
01887             revision_number = revision_;
01888             return true;
01889         }
01890         string message("Couldn't open ");
01891         message += name;
01892         message += "DB to read: ";
01893         message += strerror(errno);
01894         throw Xapian::DatabaseOpeningError(message);
01895     }
01896 
01897     if (!basic_open(revision_supplied, revision_)) {
01898 	::close(handle);
01899         handle = -1;
01900         if (revision_supplied) {
01901             // The requested revision was not available.
01902             // This could be because the database was modified underneath us, or
01903             // because a base file is missing.  Return false, and work out what
01904             // the problem was at a higher level.
01905             return false;
01906         }
01907         throw Xapian::DatabaseOpeningError("Failed to open table for reading");
01908     }
01909 
01910     for (int j = 0; j <= level; j++) {
01911         C[j].n = BLK_UNUSED;
01912         C[j].p = new byte[block_size];
01913         if (C[j].p == 0) {
01914             throw std::bad_alloc();
01915         }
01916     }
01917 
01918     read_root();
01919     return true;
01920 }
01921 
01922 void
01923 FlintTable::open()
01924 {
01925     DEBUGCALL(DB, void, "FlintTable::open", "");
01926     DEBUGLINE(DB, "opening at path " << name);
01927     close();
01928 
01929     if (!writable) {
01930         // Any errors are thrown if revision_supplied is false
01931         (void)do_open_to_read(false, 0);
01932         return;
01933     }
01934 
01935     // Any errors are thrown if revision_supplied is false.
01936     (void)do_open_to_write(false, 0);
01937 }
01938 
01939 bool
01940 FlintTable::open(flint_revision_number_t revision)
01941 {
01942     DEBUGCALL(DB, bool, "FlintTable::open", revision);
01943     DEBUGLINE(DB, "opening for particular revision at path " << name);
01944     close();
01945 
01946     if (!writable) {
01947         if (do_open_to_read(true, revision)) {
01948             AssertEq(revision_number, revision);
01949             RETURN(true);
01950         } else {
01951             close();
01952             RETURN(false);
01953         }
01954     }
01955 
01956     if (!do_open_to_write(true, revision)) {
01957         // Can't open at the requested revision.
01958         close();
01959         RETURN(false);
01960     }
01961 
01962     AssertEq(revision_number, revision);
01963     RETURN(true);
01964 }
01965 
01966 bool
01967 FlintTable::prev_for_sequential(Cursor_ * C_, int /*dummy*/) const
01968 {
01969     int c = C_[0].c;
01970     if (c == DIR_START) {
01971         byte * p = C_[0].p;
01972         Assert(p);
01973         uint4 n = C_[0].n;
01974         while (true) {
01975             if (n == 0) return false;
01976             n--;
01977             if (writable) {
01978                 if (n == C[0].n) {
01979                     // Block is a leaf block in the built-in cursor
01980                     // (potentially in modified form).
01981                     memcpy(p, C[0].p, block_size);
01982                 } else {
01983                     // Blocks in the built-in cursor may not have been written
01984                     // to disk yet, so we have to check that the block number
01985                     // isn't in the built-in cursor or we'll read an
01986                     // uninitialised block (for which GET_LEVEL(p) will
01987                     // probably return 0).
01988                     int j;
01989                     for (j = 1; j <= level; ++j) {
01990                         if (n == C[j].n) break;
01991                     }
01992                     if (j <= level) continue;
01993 
01994                     // Block isn't in the built-in cursor, so the form on disk
01995                     // is valid, so read it to check if it's the next level 0
01996                     // block.
01997                     read_block(n, p);
01998                 }
01999             } else {
02000                 read_block(n, p);
02001             }
02002             if (writable) AssertEq(revision_number, latest_revision_number);
02003             if (REVISION(p) > revision_number + writable) {
02004                 set_overwritten();
02005                 return false;
02006             }
02007             if (GET_LEVEL(p) == 0) break;
02008         }
02009         c = DIR_END(p);
02010         C_[0].n = n;
02011     }
02012     c -= D2;
02013     C_[0].c = c;
02014     return true;
02015 }
02016 
02017 bool
02018 FlintTable::next_for_sequential(Cursor_ * C_, int /*dummy*/) const
02019 {
02020     byte * p = C_[0].p;
02021     Assert(p);
02022     int c = C_[0].c;
02023     c += D2;
02024     Assert((unsigned)c < block_size);
02025     if (c == DIR_END(p)) {
02026         uint4 n = C_[0].n;
02027         while (true) {
02028             n++;
02029             if (n > base.get_last_block()) return false;
02030             if (writable) {
02031                 if (n == C[0].n) {
02032                     // Block is a leaf block in the built-in cursor
02033                     // (potentially in modified form).
02034                     memcpy(p, C[0].p, block_size);
02035                 } else {
02036                     // Blocks in the built-in cursor may not have been written
02037                     // to disk yet, so we have to check that the block number
02038                     // isn't in the built-in cursor or we'll read an
02039                     // uninitialised block (for which GET_LEVEL(p) will
02040                     // probably return 0).
02041                     int j;
02042                     for (j = 1; j <= level; ++j) {
02043                         if (n == C[j].n) break;
02044                     }
02045                     if (j <= level) continue;
02046 
02047                     // Block isn't in the built-in cursor, so the form on disk
02048                     // is valid, so read it to check if it's the next level 0
02049                     // block.
02050                     read_block(n, p);
02051                 }
02052             } else {
02053                 read_block(n, p);
02054             }
02055 
02056             if (writable) AssertEq(revision_number, latest_revision_number);
02057             if (REVISION(p) > revision_number + writable) {
02058                 set_overwritten();
02059                 return false;
02060             }
02061             if (GET_LEVEL(p) == 0) break;
02062         }
02063         c = DIR_START;
02064         C_[0].n = n;
02065     }
02066     C_[0].c = c;
02067     return true;
02068 }
02069 
02070 bool
02071 FlintTable::prev_default(Cursor_ * C_, int j) const
02072 {
02073     byte * p = C_[j].p;
02074     int c = C_[j].c;
02075     Assert(c >= DIR_START);
02076     Assert((unsigned)c < block_size);
02077     Assert(c <= DIR_END(p));
02078     if (c == DIR_START) {
02079         if (j == level) return false;
02080         if (!prev_default(C_, j + 1)) return false;
02081         c = DIR_END(p);
02082     }
02083     c -= D2;
02084     C_[j].c = c;
02085     if (j > 0) {
02086         block_to_cursor(C_, j - 1, Item_(p, c).block_given_by());
02087     }
02088     return true;
02089 }
02090 
02091 bool
02092 FlintTable::next_default(Cursor_ * C_, int j) const
02093 {
02094     byte * p = C_[j].p;
02095     int c = C_[j].c;
02096     Assert(c >= DIR_START);
02097     c += D2;
02098     Assert((unsigned)c < block_size);
02099     // Sometimes c can be DIR_END(p) + 2 here it appears...
02100     if (c > DIR_END(p)) c = DIR_END(p);
02101     Assert(c <= DIR_END(p));
02102     if (c == DIR_END(p)) {
02103         if (j == level) return false;
02104         if (!next_default(C_, j + 1)) return false;
02105         c = DIR_START;
02106     }
02107     C_[j].c = c;
02108     if (j > 0) {
02109         block_to_cursor(C_, j - 1, Item_(p, c).block_given_by());
02110 #ifdef BTREE_DEBUG_FULL
02111         printf("Block in FlintTable:next_default");
02112         report_block_full(j - 1, C_[j - 1].n, C_[j - 1].p);
02113 #endif /* BTREE_DEBUG_FULL */
02114     }
02115     return true;
02116 }
02117 
02133 bool Key_::operator<(Key_ key2) const
02134 {
02135     DEBUGCALL(DB, bool, "Key_::operator<", static_cast<const void*>(key2.p));
02136     int key1_len = length();
02137     int key2_len = key2.length();
02138     if (key1_len == key2_len) {
02139         // The keys are the same length, so we can compare the counts
02140         // in the same operation since they're stored as 2 byte
02141         // bigendian numbers.
02142         RETURN(memcmp(p + K1, key2.p + K1, key1_len + C2) < 0);
02143     }
02144 
02145     int k_smaller = (key2_len < key1_len ? key2_len : key1_len);
02146 
02147     // Compare the common part of the keys
02148     int diff = memcmp(p + K1, key2.p + K1, k_smaller);
02149     if (diff != 0) RETURN(diff < 0);
02150 
02151     // We dealt with the "same length" case above so we never need to check
02152     // the count here.
02153     RETURN(key1_len < key2_len);
02154 }
02155 
02156 bool Key_::operator==(Key_ key2) const
02157 {
02158     DEBUGCALL(DB, bool, "Key_::operator==", static_cast<const void*>(key2.p));
02159     int key1_len = length();
02160     if (key1_len != key2.length()) return false;
02161     // The keys are the same length, so we can compare the counts
02162     // in the same operation since they're stored as 2 byte
02163     // bigendian numbers.
02164     RETURN(memcmp(p + K1, key2.p + K1, key1_len + C2) == 0);
02165 }

Documentation for Xapian (version 1.0.10).
Generated on 24 Dec 2008 by Doxygen 1.5.2.