Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

db_join.c

00001 /*
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1998-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: db_join.c,v 12.6 2005/10/07 20:21:22 ubell Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <stdlib.h>
00016 #include <string.h>
00017 #endif
00018 
00019 #include "db_int.h"
00020 #include "dbinc/db_page.h"
00021 #include "dbinc/db_join.h"
00022 #include "dbinc/btree.h"
00023 
00024 static int __db_join_close_pp __P((DBC *));
00025 static int __db_join_cmp __P((const void *, const void *));
00026 static int __db_join_del __P((DBC *, u_int32_t));
00027 static int __db_join_get __P((DBC *, DBT *, DBT *, u_int32_t));
00028 static int __db_join_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
00029 static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
00030 static int __db_join_primget __P((DB *,
00031     DB_TXN *, u_int32_t, DBT *, DBT *, u_int32_t));
00032 static int __db_join_put __P((DBC *, DBT *, DBT *, u_int32_t));
00033 
00034 /*
00035  * Check to see if the Nth secondary cursor of join cursor jc is pointing
00036  * to a sorted duplicate set.
00037  */
00038 #define SORTED_SET(jc, n)   ((jc)->j_curslist[(n)]->dbp->dup_compare != NULL)
00039 
00040 /*
00041  * This is the duplicate-assisted join functionality.  Right now we're
00042  * going to write it such that we return one item at a time, although
00043  * I think we may need to optimize it to return them all at once.
00044  * It should be easier to get it working this way, and I believe that
00045  * changing it should be fairly straightforward.
00046  *
00047  * We optimize the join by sorting cursors from smallest to largest
00048  * cardinality.  In most cases, this is indeed optimal.  However, if
00049  * a cursor with large cardinality has very few data in common with the
00050  * first cursor, it is possible that the join will be made faster by
00051  * putting it earlier in the cursor list.  Since we have no way to detect
00052  * cases like this, we simply provide a flag, DB_JOIN_NOSORT, which retains
00053  * the sort order specified by the caller, who may know more about the
00054  * structure of the data.
00055  *
00056  * The first cursor moves sequentially through the duplicate set while
00057  * the others search explicitly for the duplicate in question.
00058  *
00059  */
00060 
00061 /*
00062  * __db_join --
00063  *      This is the interface to the duplicate-assisted join functionality.
00064  * In the same way that cursors mark a position in a database, a cursor
00065  * can mark a position in a join.  While most cursors are created by the
00066  * cursor method of a DB, join cursors are created through an explicit
00067  * call to DB->join.
00068  *
00069  * The curslist is an array of existing, initialized cursors and primary
00070  * is the DB of the primary file.  The data item that joins all the
00071  * cursors in the curslist is used as the key into the primary and that
00072  * key and data are returned.  When no more items are left in the join
00073  * set, the  c_next operation off the join cursor will return DB_NOTFOUND.
00074  *
00075  * PUBLIC: int __db_join __P((DB *, DBC **, DBC **, u_int32_t));
00076  */
00077 int
00078 __db_join(primary, curslist, dbcp, flags)
00079         DB *primary;
00080         DBC **curslist, **dbcp;
00081         u_int32_t flags;
00082 {
00083         DB_ENV *dbenv;
00084         DBC *dbc;
00085         JOIN_CURSOR *jc;
00086         size_t ncurs, nslots;
00087         u_int32_t i;
00088         int ret;
00089 
00090         dbenv = primary->dbenv;
00091         dbc = NULL;
00092         jc = NULL;
00093 
00094         if ((ret = __os_calloc(dbenv, 1, sizeof(DBC), &dbc)) != 0)
00095                 goto err;
00096 
00097         if ((ret = __os_calloc(dbenv, 1, sizeof(JOIN_CURSOR), &jc)) != 0)
00098                 goto err;
00099 
00100         if ((ret = __os_malloc(dbenv, 256, &jc->j_key.data)) != 0)
00101                 goto err;
00102         jc->j_key.ulen = 256;
00103         F_SET(&jc->j_key, DB_DBT_USERMEM);
00104 
00105         F_SET(&jc->j_rdata, DB_DBT_REALLOC);
00106 
00107         for (jc->j_curslist = curslist;
00108             *jc->j_curslist != NULL; jc->j_curslist++)
00109                 ;
00110 
00111         /*
00112          * The number of cursor slots we allocate is one greater than
00113          * the number of cursors involved in the join, because the
00114          * list is NULL-terminated.
00115          */
00116         ncurs = (size_t)(jc->j_curslist - curslist);
00117         nslots = ncurs + 1;
00118 
00119         /*
00120          * !!! -- A note on the various lists hanging off jc.
00121          *
00122          * j_curslist is the initial NULL-terminated list of cursors passed
00123          * into __db_join.  The original cursors are not modified; pristine
00124          * copies are required because, in databases with unsorted dups, we
00125          * must reset all of the secondary cursors after the first each
00126          * time the first one is incremented, or else we will lose data
00127          * which happen to be sorted differently in two different cursors.
00128          *
00129          * j_workcurs is where we put those copies that we're planning to
00130          * work with.  They're lazily c_dup'ed from j_curslist as we need
00131          * them, and closed when the join cursor is closed or when we need
00132          * to reset them to their original values (in which case we just
00133          * c_dup afresh).
00134          *
00135          * j_fdupcurs is an array of cursors which point to the first
00136          * duplicate in the duplicate set that contains the data value
00137          * we're currently interested in.  We need this to make
00138          * __db_join_get correctly return duplicate duplicates;  i.e., if a
00139          * given data value occurs twice in the set belonging to cursor #2,
00140          * and thrice in the set belonging to cursor #3, and once in all
00141          * the other cursors, successive calls to __db_join_get need to
00142          * return that data item six times.  To make this happen, each time
00143          * cursor N is allowed to advance to a new datum, all cursors M
00144          * such that M > N have to be reset to the first duplicate with
00145          * that datum, so __db_join_get will return all the dup-dups again.
00146          * We could just reset them to the original cursor from j_curslist,
00147          * but that would be a bit slower in the unsorted case and a LOT
00148          * slower in the sorted one.
00149          *
00150          * j_exhausted is a list of boolean values which represent
00151          * whether or not their corresponding cursors are "exhausted",
00152          * i.e. whether the datum under the corresponding cursor has
00153          * been found not to exist in any unreturned combinations of
00154          * later secondary cursors, in which case they are ready to be
00155          * incremented.
00156          */
00157 
00158         /* We don't want to free regions whose callocs have failed. */
00159         jc->j_curslist = NULL;
00160         jc->j_workcurs = NULL;
00161         jc->j_fdupcurs = NULL;
00162         jc->j_exhausted = NULL;
00163 
00164         if ((ret = __os_calloc(dbenv, nslots, sizeof(DBC *),
00165             &jc->j_curslist)) != 0)
00166                 goto err;
00167         if ((ret = __os_calloc(dbenv, nslots, sizeof(DBC *),
00168             &jc->j_workcurs)) != 0)
00169                 goto err;
00170         if ((ret = __os_calloc(dbenv, nslots, sizeof(DBC *),
00171             &jc->j_fdupcurs)) != 0)
00172                 goto err;
00173         if ((ret = __os_calloc(dbenv, nslots, sizeof(u_int8_t),
00174             &jc->j_exhausted)) != 0)
00175                 goto err;
00176         for (i = 0; curslist[i] != NULL; i++) {
00177                 jc->j_curslist[i] = curslist[i];
00178                 jc->j_workcurs[i] = NULL;
00179                 jc->j_fdupcurs[i] = NULL;
00180                 jc->j_exhausted[i] = 0;
00181         }
00182         jc->j_ncurs = (u_int32_t)ncurs;
00183 
00184         /*
00185          * If DB_JOIN_NOSORT is not set, optimize secondary cursors by
00186          * sorting in order of increasing cardinality.
00187          */
00188         if (!LF_ISSET(DB_JOIN_NOSORT))
00189                 qsort(jc->j_curslist, ncurs, sizeof(DBC *), __db_join_cmp);
00190 
00191         /*
00192          * We never need to reset the 0th cursor, so there's no
00193          * solid reason to use workcurs[0] rather than curslist[0] in
00194          * join_get.  Nonetheless, it feels cleaner to do it for symmetry,
00195          * and this is the most logical place to copy it.
00196          *
00197          * !!!
00198          * There's no need to close the new cursor if we goto err only
00199          * because this is the last thing that can fail.  Modifier of this
00200          * function beware!
00201          */
00202         if ((ret =
00203             __db_c_dup(jc->j_curslist[0], jc->j_workcurs, DB_POSITION)) != 0)
00204                 goto err;
00205 
00206         dbc->c_close = __db_join_close_pp;
00207         dbc->c_del = __db_join_del;
00208         dbc->c_get = __db_join_get_pp;
00209         dbc->c_put = __db_join_put;
00210         dbc->internal = (DBC_INTERNAL *)jc;
00211         dbc->dbp = primary;
00212         jc->j_primary = primary;
00213 
00214         /* Stash the first cursor's transaction here for easy access. */
00215         dbc->txn = curslist[0]->txn;
00216 
00217         *dbcp = dbc;
00218 
00219         MUTEX_LOCK(dbenv, primary->mutex);
00220         TAILQ_INSERT_TAIL(&primary->join_queue, dbc, links);
00221         MUTEX_UNLOCK(dbenv, primary->mutex);
00222 
00223         return (0);
00224 
00225 err:    if (jc != NULL) {
00226                 if (jc->j_curslist != NULL)
00227                         __os_free(dbenv, jc->j_curslist);
00228                 if (jc->j_workcurs != NULL) {
00229                         if (jc->j_workcurs[0] != NULL)
00230                                 (void)__db_c_close(jc->j_workcurs[0]);
00231                         __os_free(dbenv, jc->j_workcurs);
00232                 }
00233                 if (jc->j_fdupcurs != NULL)
00234                         __os_free(dbenv, jc->j_fdupcurs);
00235                 if (jc->j_exhausted != NULL)
00236                         __os_free(dbenv, jc->j_exhausted);
00237                 __os_free(dbenv, jc);
00238         }
00239         if (dbc != NULL)
00240                 __os_free(dbenv, dbc);
00241         return (ret);
00242 }
00243 
00244 /*
00245  * __db_join_close_pp --
00246  *      DBC->c_close pre/post processing for join cursors.
00247  */
00248 static int
00249 __db_join_close_pp(dbc)
00250         DBC *dbc;
00251 {
00252         DB_ENV *dbenv;
00253         DB_THREAD_INFO *ip;
00254         DB *dbp;
00255         int handle_check, ret, t_ret;
00256 
00257         dbp = dbc->dbp;
00258         dbenv = dbp->dbenv;
00259 
00260         PANIC_CHECK(dbenv);
00261 
00262         ENV_ENTER(dbenv, ip);
00263 
00264         handle_check = IS_ENV_REPLICATED(dbenv);
00265         if (handle_check &&
00266             (ret = __db_rep_enter(dbp, 1, 0, dbc->txn != NULL)) != 0) {
00267                 handle_check = 0;
00268                 goto err;
00269         }
00270 
00271         ret = __db_join_close(dbc);
00272 
00273         if (handle_check && (t_ret = __env_db_rep_exit(dbenv)) != 0 && ret == 0)
00274                 ret = t_ret;
00275 
00276 err:    ENV_LEAVE(dbenv, ip);
00277         return (ret);
00278 }
00279 
00280 static int
00281 __db_join_put(dbc, key, data, flags)
00282         DBC *dbc;
00283         DBT *key;
00284         DBT *data;
00285         u_int32_t flags;
00286 {
00287         PANIC_CHECK(dbc->dbp->dbenv);
00288 
00289         COMPQUIET(key, NULL);
00290         COMPQUIET(data, NULL);
00291         COMPQUIET(flags, 0);
00292         return (EINVAL);
00293 }
00294 
00295 static int
00296 __db_join_del(dbc, flags)
00297         DBC *dbc;
00298         u_int32_t flags;
00299 {
00300         PANIC_CHECK(dbc->dbp->dbenv);
00301 
00302         COMPQUIET(flags, 0);
00303         return (EINVAL);
00304 }
00305 
00306 /*
00307  * __db_join_get_pp --
00308  *      DBjoin->get pre/post processing.
00309  */
00310 static int
00311 __db_join_get_pp(dbc, key, data, flags)
00312         DBC *dbc;
00313         DBT *key, *data;
00314         u_int32_t flags;
00315 {
00316         DB *dbp;
00317         DB_ENV *dbenv;
00318         DB_THREAD_INFO *ip;
00319         u_int32_t handle_check, save_flags;
00320         int ret, t_ret;
00321 
00322         dbp = dbc->dbp;
00323         dbenv = dbp->dbenv;
00324 
00325         /* Save the original flags value. */
00326         save_flags = flags;
00327 
00328         PANIC_CHECK(dbenv);
00329 
00330         if (LF_ISSET(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW)) {
00331                 if (!LOCKING_ON(dbp->dbenv))
00332                         return (__db_fnl(dbp->dbenv, "DBcursor->c_get"));
00333                 LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW);
00334         }
00335 
00336         switch (flags) {
00337         case 0:
00338         case DB_JOIN_ITEM:
00339                 break;
00340         default:
00341                 return (__db_ferr(dbp->dbenv, "DBcursor->c_get", 0));
00342         }
00343 
00344         /*
00345          * A partial get of the key of a join cursor don't make much sense;
00346          * the entire key is necessary to query the primary database
00347          * and find the datum, and so regardless of the size of the key
00348          * it would not be a performance improvement.  Since it would require
00349          * special handling, we simply disallow it.
00350          *
00351          * A partial get of the data, however, potentially makes sense (if
00352          * all possible data are a predictable large structure, for instance)
00353          * and causes us no headaches, so we permit it.
00354          */
00355         if (F_ISSET(key, DB_DBT_PARTIAL)) {
00356                 __db_err(dbp->dbenv,
00357                     "DB_DBT_PARTIAL may not be set on key during join_get");
00358                 return (EINVAL);
00359         }
00360 
00361         ENV_ENTER(dbenv, ip);
00362 
00363         handle_check = IS_ENV_REPLICATED(dbp->dbenv);
00364         if (handle_check &&
00365             (ret = __db_rep_enter(dbp, 1, 0, dbc->txn != NULL)) != 0) {
00366                 handle_check = 0;
00367                 goto err;
00368         }
00369 
00370         /* Restore the original flags value. */
00371         flags = save_flags;
00372 
00373         ret = __db_join_get(dbc, key, data, flags);
00374 
00375         if (handle_check && (t_ret = __env_db_rep_exit(dbenv)) != 0 && ret == 0)
00376                 ret = t_ret;
00377 
00378 err:    ENV_LEAVE(dbenv, ip);
00379         return (ret);
00380 }
00381 
00382 static int
00383 __db_join_get(dbc, key_arg, data_arg, flags)
00384         DBC *dbc;
00385         DBT *key_arg, *data_arg;
00386         u_int32_t flags;
00387 {
00388         DBT *key_n, key_n_mem;
00389         DB *dbp;
00390         DBC *cp;
00391         JOIN_CURSOR *jc;
00392         int db_manage_data, ret;
00393         u_int32_t i, j, operation, opmods;
00394 
00395         dbp = dbc->dbp;
00396         jc = (JOIN_CURSOR *)dbc->internal;
00397 
00398         operation = LF_ISSET(DB_OPFLAGS_MASK);
00399 
00400         /* !!!
00401          * If the set of flags here changes, check that __db_join_primget
00402          * is updated to handle them properly.
00403          */
00404         opmods = LF_ISSET(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW);
00405 
00406         /*
00407          * Since we are fetching the key as a datum in the secondary indices,
00408          * we must be careful of caller-specified DB_DBT_* memory
00409          * management flags.  If necessary, use a stack-allocated DBT;
00410          * we'll appropriately copy and/or allocate the data later.
00411          */
00412         if (F_ISSET(key_arg, DB_DBT_USERMEM) ||
00413             F_ISSET(key_arg, DB_DBT_MALLOC)) {
00414                 /* We just use the default buffer;  no need to go malloc. */
00415                 key_n = &key_n_mem;
00416                 memset(key_n, 0, sizeof(DBT));
00417         } else {
00418                 /*
00419                  * Either DB_DBT_REALLOC or the default buffer will work
00420                  * fine if we have to reuse it, as we do.
00421                  */
00422                 key_n = key_arg;
00423         }
00424 
00425         /*
00426          * If our last attempt to do a get on the primary key failed,
00427          * short-circuit the join and try again with the same key.
00428          */
00429         if (F_ISSET(jc, JOIN_RETRY))
00430                 goto samekey;
00431         F_CLR(jc, JOIN_RETRY);
00432 
00433 retry:  ret = __db_c_get(jc->j_workcurs[0], &jc->j_key, key_n,
00434             opmods | (jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT));
00435 
00436         if (ret == DB_BUFFER_SMALL) {
00437                 jc->j_key.ulen <<= 1;
00438                 if ((ret = __os_realloc(dbp->dbenv,
00439                     jc->j_key.ulen, &jc->j_key.data)) != 0)
00440                         goto mem_err;
00441                 goto retry;
00442         }
00443 
00444         /*
00445          * If ret == DB_NOTFOUND, we're out of elements of the first
00446          * secondary cursor.  This is how we finally finish the join
00447          * if all goes well.
00448          */
00449         if (ret != 0)
00450                 goto err;
00451 
00452         /*
00453          * If jc->j_exhausted[0] == 1, we've just advanced the first cursor,
00454          * and we're going to want to advance all the cursors that point to
00455          * the first member of a duplicate duplicate set (j_fdupcurs[1..N]).
00456          * Close all the cursors in j_fdupcurs;  we'll reopen them the
00457          * first time through the upcoming loop.
00458          */
00459         for (i = 1; i < jc->j_ncurs; i++) {
00460                 if (jc->j_fdupcurs[i] != NULL &&
00461                     (ret = __db_c_close(jc->j_fdupcurs[i])) != 0)
00462                         goto err;
00463                 jc->j_fdupcurs[i] = NULL;
00464         }
00465 
00466         /*
00467          * If jc->j_curslist[1] == NULL, we have only one cursor in the join.
00468          * Thus, we can safely increment that one cursor on each call
00469          * to __db_join_get, and we signal this by setting jc->j_exhausted[0]
00470          * right away.
00471          *
00472          * Otherwise, reset jc->j_exhausted[0] to 0, so that we don't
00473          * increment it until we know we're ready to.
00474          */
00475         if (jc->j_curslist[1] == NULL)
00476                 jc->j_exhausted[0] = 1;
00477         else
00478                 jc->j_exhausted[0] = 0;
00479 
00480         /* We have the first element; now look for it in the other cursors. */
00481         for (i = 1; i < jc->j_ncurs; i++) {
00482                 DB_ASSERT(jc->j_curslist[i] != NULL);
00483                 if (jc->j_workcurs[i] == NULL)
00484                         /* If this is NULL, we need to dup curslist into it. */
00485                         if ((ret = __db_c_dup(jc->j_curslist[i],
00486                             &jc->j_workcurs[i], DB_POSITION)) != 0)
00487                                 goto err;
00488 
00489 retry2:         cp = jc->j_workcurs[i];
00490 
00491                 if ((ret = __db_join_getnext(cp, &jc->j_key, key_n,
00492                             jc->j_exhausted[i], opmods)) == DB_NOTFOUND) {
00493                         /*
00494                          * jc->j_workcurs[i] has no more of the datum we're
00495                          * interested in.  Go back one cursor and get
00496                          * a new dup.  We can't just move to a new
00497                          * element of the outer relation, because that way
00498                          * we might miss duplicate duplicates in cursor i-1.
00499                          *
00500                          * If this takes us back to the first cursor,
00501                          * -then- we can move to a new element of the outer
00502                          * relation.
00503                          */
00504                         --i;
00505                         jc->j_exhausted[i] = 1;
00506 
00507                         if (i == 0) {
00508                                 for (j = 1; jc->j_workcurs[j] != NULL; j++) {
00509                                         /*
00510                                          * We're moving to a new element of
00511                                          * the first secondary cursor.  If
00512                                          * that cursor is sorted, then any
00513                                          * other sorted cursors can be safely
00514                                          * reset to the first duplicate
00515                                          * duplicate in the current set if we
00516                                          * have a pointer to it (we can't just
00517                                          * leave them be, or we'll miss
00518                                          * duplicate duplicates in the outer
00519                                          * relation).
00520                                          *
00521                                          * If the first cursor is unsorted, or
00522                                          * if cursor j is unsorted, we can
00523                                          * make no assumptions about what
00524                                          * we're looking for next or where it
00525                                          * will be, so we reset to the very
00526                                          * beginning (setting workcurs NULL
00527                                          * will achieve this next go-round).
00528                                          *
00529                                          * XXX: This is likely to break
00530                                          * horribly if any two cursors are
00531                                          * both sorted, but have different
00532                                          * specified sort functions.  For,
00533                                          * now, we dismiss this as pathology
00534                                          * and let strange things happen--we
00535                                          * can't make rope childproof.
00536                                          */
00537                                         if ((ret = __db_c_close(
00538                                             jc->j_workcurs[j])) != 0)
00539                                                 goto err;
00540                                         if (!SORTED_SET(jc, 0) ||
00541                                             !SORTED_SET(jc, j) ||
00542                                             jc->j_fdupcurs[j] == NULL)
00543                                                 /*
00544                                                  * Unsafe conditions;
00545                                                  * reset fully.
00546                                                  */
00547                                                 jc->j_workcurs[j] = NULL;
00548                                         else
00549                                                 /* Partial reset suffices. */
00550                                                 if ((__db_c_dup(
00551                                                     jc->j_fdupcurs[j],
00552                                                     &jc->j_workcurs[j],
00553                                                     DB_POSITION)) != 0)
00554                                                         goto err;
00555                                         jc->j_exhausted[j] = 0;
00556                                 }
00557                                 goto retry;
00558                                 /* NOTREACHED */
00559                         }
00560 
00561                         /*
00562                          * We're about to advance the cursor and need to
00563                          * reset all of the workcurs[j] where j>i, so that
00564                          * we don't miss any duplicate duplicates.
00565                          */
00566                         for (j = i + 1;
00567                             jc->j_workcurs[j] != NULL;
00568                             j++) {
00569                                 if ((ret =
00570                                     __db_c_close(jc->j_workcurs[j])) != 0)
00571                                         goto err;
00572                                 jc->j_exhausted[j] = 0;
00573                                 if (jc->j_fdupcurs[j] == NULL)
00574                                         jc->j_workcurs[j] = NULL;
00575                                 else if ((ret = __db_c_dup(jc->j_fdupcurs[j],
00576                                     &jc->j_workcurs[j], DB_POSITION)) != 0)
00577                                         goto err;
00578                         }
00579                         goto retry2;
00580                         /* NOTREACHED */
00581                 }
00582 
00583                 if (ret == DB_BUFFER_SMALL) {
00584                         jc->j_key.ulen <<= 1;
00585                         if ((ret = __os_realloc(dbp->dbenv, jc->j_key.ulen,
00586                             &jc->j_key.data)) != 0) {
00587 mem_err:                        __db_err(dbp->dbenv,
00588                                     "Allocation failed for join key, len = %lu",
00589                                     (u_long)jc->j_key.ulen);
00590                                 goto err;
00591                         }
00592                         goto retry2;
00593                 }
00594 
00595                 if (ret != 0)
00596                         goto err;
00597 
00598                 /*
00599                  * If we made it this far, we've found a matching
00600                  * datum in cursor i.  Mark the current cursor
00601                  * unexhausted, so we don't miss any duplicate
00602                  * duplicates the next go-round--unless this is the
00603                  * very last cursor, in which case there are none to
00604                  * miss, and we'll need that exhausted flag to finally
00605                  * get a DB_NOTFOUND and move on to the next datum in
00606                  * the outermost cursor.
00607                  */
00608                 if (i + 1 != jc->j_ncurs)
00609                         jc->j_exhausted[i] = 0;
00610                 else
00611                         jc->j_exhausted[i] = 1;
00612 
00613                 /*
00614                  * If jc->j_fdupcurs[i] is NULL and the ith cursor's dups are
00615                  * sorted, then we're here for the first time since advancing
00616                  * cursor 0, and we have a new datum of interest.
00617                  * jc->j_workcurs[i] points to the beginning of a set of
00618                  * duplicate duplicates;  store this into jc->j_fdupcurs[i].
00619                  */
00620                 if (SORTED_SET(jc, i) && jc->j_fdupcurs[i] == NULL && (ret =
00621                     __db_c_dup(cp, &jc->j_fdupcurs[i], DB_POSITION)) != 0)
00622                         goto err;
00623         }
00624 
00625 err:    if (ret != 0)
00626                 return (ret);
00627 
00628         if (0) {
00629 samekey:        /*
00630                  * Get the key we tried and failed to return last time;
00631                  * it should be the current datum of all the secondary cursors.
00632                  */
00633                 if ((ret = __db_c_get(jc->j_workcurs[0],
00634                     &jc->j_key, key_n, DB_CURRENT | opmods)) != 0)
00635                         return (ret);
00636                 F_CLR(jc, JOIN_RETRY);
00637         }
00638 
00639         /*
00640          * ret == 0;  we have a key to return.
00641          *
00642          * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to copy the key
00643          * back into the dbt we were given for the key; call __db_retcopy.
00644          * Otherwise, assert that we do not need to copy anything and proceed.
00645          */
00646         DB_ASSERT(F_ISSET(
00647             key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) || key_n == key_arg);
00648 
00649         if (F_ISSET(key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC) &&
00650             (ret = __db_retcopy(dbp->dbenv,
00651             key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) {
00652                 /*
00653                  * The retcopy failed, most commonly because we have a user
00654                  * buffer for the key which is too small. Set things up to
00655                  * retry next time, and return.
00656                  */
00657                 F_SET(jc, JOIN_RETRY);
00658                 return (ret);
00659         }
00660 
00661         /*
00662          * If DB_JOIN_ITEM is set, we return it; otherwise we do the lookup
00663          * in the primary and then return.
00664          *
00665          * Note that we use key_arg here;  it is safe (and appropriate)
00666          * to do so.
00667          */
00668         if (operation == DB_JOIN_ITEM)
00669                 return (0);
00670 
00671         /*
00672          * If data_arg->flags == 0--that is, if DB is managing the
00673          * data DBT's memory--it's not safe to just pass the DBT
00674          * through to the primary get call, since we don't want that
00675          * memory to belong to the primary DB handle (and if the primary
00676          * is free-threaded, it can't anyway).
00677          *
00678          * Instead, use memory that is managed by the join cursor, in
00679          * jc->j_rdata.
00680          */
00681         if (!F_ISSET(data_arg, DB_DBT_MALLOC | DB_DBT_REALLOC | DB_DBT_USERMEM))
00682                 db_manage_data = 1;
00683         else
00684                 db_manage_data = 0;
00685         if ((ret = __db_join_primget(jc->j_primary,
00686             jc->j_curslist[0]->txn, jc->j_curslist[0]->locker, key_arg,
00687             db_manage_data ? &jc->j_rdata : data_arg, opmods)) != 0) {
00688                 if (ret == DB_NOTFOUND)
00689                         /*
00690                          * If ret == DB_NOTFOUND, the primary and secondary
00691                          * are out of sync;  every item in each secondary
00692                          * should correspond to something in the primary,
00693                          * or we shouldn't have done the join this way.
00694                          * Wail.
00695                          */
00696                         ret = __db_secondary_corrupt(jc->j_primary);
00697                 else
00698                         /*
00699                          * The get on the primary failed for some other
00700                          * reason, most commonly because we're using a user
00701                          * buffer that's not big enough.  Flag our failure
00702                          * so we can return the same key next time.
00703                          */
00704                         F_SET(jc, JOIN_RETRY);
00705         }
00706         if (db_manage_data && ret == 0) {
00707                 data_arg->data = jc->j_rdata.data;
00708                 data_arg->size = jc->j_rdata.size;
00709         }
00710 
00711         return (ret);
00712 }
00713 
00714 /*
00715  * __db_join_close --
00716  *      DBC->c_close for join cursors.
00717  *
00718  * PUBLIC: int __db_join_close __P((DBC *));
00719  */
00720 int
00721 __db_join_close(dbc)
00722         DBC *dbc;
00723 {
00724         DB *dbp;
00725         DB_ENV *dbenv;
00726         JOIN_CURSOR *jc;
00727         int ret, t_ret;
00728         u_int32_t i;
00729 
00730         jc = (JOIN_CURSOR *)dbc->internal;
00731         dbp = dbc->dbp;
00732         dbenv = dbp->dbenv;
00733         ret = t_ret = 0;
00734 
00735         /*
00736          * Remove from active list of join cursors.  Note that this
00737          * must happen before any action that can fail and return, or else
00738          * __db_close may loop indefinitely.
00739          */
00740         MUTEX_LOCK(dbenv, dbp->mutex);
00741         TAILQ_REMOVE(&dbp->join_queue, dbc, links);
00742         MUTEX_UNLOCK(dbenv, dbp->mutex);
00743 
00744         PANIC_CHECK(dbenv);
00745 
00746         /*
00747          * Close any open scratch cursors.  In each case, there may
00748          * not be as many outstanding as there are cursors in
00749          * curslist, but we want to close whatever's there.
00750          *
00751          * If any close fails, there's no reason not to close everything else;
00752          * we'll just return the error code of the last one to fail.  There's
00753          * not much the caller can do anyway, since these cursors only exist
00754          * hanging off a db-internal data structure that they shouldn't be
00755          * mucking with.
00756          */
00757         for (i = 0; i < jc->j_ncurs; i++) {
00758                 if (jc->j_workcurs[i] != NULL &&
00759                     (t_ret = __db_c_close(jc->j_workcurs[i])) != 0)
00760                         ret = t_ret;
00761                 if (jc->j_fdupcurs[i] != NULL &&
00762                     (t_ret = __db_c_close(jc->j_fdupcurs[i])) != 0)
00763                         ret = t_ret;
00764         }
00765 
00766         __os_free(dbenv, jc->j_exhausted);
00767         __os_free(dbenv, jc->j_curslist);
00768         __os_free(dbenv, jc->j_workcurs);
00769         __os_free(dbenv, jc->j_fdupcurs);
00770         __os_free(dbenv, jc->j_key.data);
00771         if (jc->j_rdata.data != NULL)
00772                 __os_ufree(dbenv, jc->j_rdata.data);
00773         __os_free(dbenv, jc);
00774         __os_free(dbenv, dbc);
00775 
00776         return (ret);
00777 }
00778 
00779 /*
00780  * __db_join_getnext --
00781  *      This function replaces the DBC_CONTINUE and DBC_KEYSET
00782  *      functionality inside the various cursor get routines.
00783  *
00784  *      If exhausted == 0, we're not done with the current datum;
00785  *      return it if it matches "matching", otherwise search
00786  *      using DB_GET_BOTHC (which is faster than iteratively doing
00787  *      DB_NEXT_DUP) forward until we find one that does.
00788  *
00789  *      If exhausted == 1, we are done with the current datum, so just
00790  *      leap forward to searching NEXT_DUPs.
00791  *
00792  *      If no matching datum exists, returns DB_NOTFOUND, else 0.
00793  */
00794 static int
00795 __db_join_getnext(dbc, key, data, exhausted, opmods)
00796         DBC *dbc;
00797         DBT *key, *data;
00798         u_int32_t exhausted, opmods;
00799 {
00800         int ret, cmp;
00801         DB *dbp;
00802         DBT ldata;
00803         int (*func) __P((DB *, const DBT *, const DBT *));
00804 
00805         dbp = dbc->dbp;
00806         func = (dbp->dup_compare == NULL) ? __bam_defcmp : dbp->dup_compare;
00807 
00808         switch (exhausted) {
00809         case 0:
00810                 /*
00811                  * We don't want to step on data->data;  use a new
00812                  * DBT and malloc so we don't step on dbc's rdata memory.
00813                  */
00814                 memset(&ldata, 0, sizeof(DBT));
00815                 F_SET(&ldata, DB_DBT_MALLOC);
00816                 if ((ret = __db_c_get(dbc,
00817                     key, &ldata, opmods | DB_CURRENT)) != 0)
00818                         break;
00819                 cmp = func(dbp, data, &ldata);
00820                 if (cmp == 0) {
00821                         /*
00822                          * We have to return the real data value.  Copy
00823                          * it into data, then free the buffer we malloc'ed
00824                          * above.
00825                          */
00826                         if ((ret = __db_retcopy(dbp->dbenv, data, ldata.data,
00827                             ldata.size, &data->data, &data->size)) != 0)
00828                                 return (ret);
00829                         __os_ufree(dbp->dbenv, ldata.data);
00830                         return (0);
00831                 }
00832 
00833                 /*
00834                  * Didn't match--we want to fall through and search future
00835                  * dups.  We just forget about ldata and free
00836                  * its buffer--data contains the value we're searching for.
00837                  */
00838                 __os_ufree(dbp->dbenv, ldata.data);
00839                 /* FALLTHROUGH */
00840         case 1:
00841                 ret = __db_c_get(dbc, key, data, opmods | DB_GET_BOTHC);
00842                 break;
00843         default:
00844                 ret = EINVAL;
00845                 break;
00846         }
00847 
00848         return (ret);
00849 }
00850 
00851 /*
00852  * __db_join_cmp --
00853  *      Comparison function for sorting DBCs in cardinality order.
00854  */
00855 static int
00856 __db_join_cmp(a, b)
00857         const void *a, *b;
00858 {
00859         DBC *dbca, *dbcb;
00860         db_recno_t counta, countb;
00861 
00862         dbca = *((DBC * const *)a);
00863         dbcb = *((DBC * const *)b);
00864 
00865         if (__db_c_count(dbca, &counta) != 0 ||
00866             __db_c_count(dbcb, &countb) != 0)
00867                 return (0);
00868 
00869         return ((long)counta - (long)countb);
00870 }
00871 
00872 /*
00873  * __db_join_primget --
00874  *      Perform a DB->get in the primary, being careful not to use a new
00875  * locker ID if we're doing CDB locking.
00876  */
00877 static int
00878 __db_join_primget(dbp, txn, lockerid, key, data, flags)
00879         DB *dbp;
00880         DB_TXN *txn;
00881         u_int32_t lockerid;
00882         DBT *key, *data;
00883         u_int32_t flags;
00884 {
00885         DBC *dbc;
00886         u_int32_t rmw;
00887         int ret, t_ret;
00888 
00889         if ((ret = __db_cursor_int(dbp,
00890             txn, dbp->type, PGNO_INVALID, 0, lockerid, &dbc)) != 0)
00891                 return (ret);
00892 
00893         /*
00894          * The only allowable flags here are the two flags copied into "opmods"
00895          * in __db_join_get, DB_RMW and DB_READ_UNCOMMITTED.  The former is an
00896          * op on the c_get call, the latter on the cursor call.  It's a DB bug
00897          * if we allow any other flags down in here.
00898          */
00899         rmw = LF_ISSET(DB_RMW);
00900         if (LF_ISSET(DB_READ_UNCOMMITTED) ||
00901             (txn != NULL && F_ISSET(txn, TXN_READ_UNCOMMITTED)))
00902                 F_SET(dbc, DBC_READ_UNCOMMITTED);
00903 
00904         if (LF_ISSET(DB_READ_COMMITTED) ||
00905             (txn != NULL && F_ISSET(txn, TXN_READ_COMMITTED)))
00906                 F_SET(dbc, DBC_READ_COMMITTED);
00907 
00908         LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW);
00909         DB_ASSERT(flags == 0);
00910 
00911         F_SET(dbc, DBC_TRANSIENT);
00912 
00913         /*
00914          * This shouldn't be necessary, thanks to the fact that join cursors
00915          * swap in their own DB_DBT_REALLOC'ed buffers, but just for form's
00916          * sake, we mirror what __db_get does.
00917          */
00918         SET_RET_MEM(dbc, dbp);
00919 
00920         ret = __db_c_get(dbc, key, data, DB_SET | rmw);
00921 
00922         if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
00923                 ret = t_ret;
00924 
00925         return (ret);
00926 }
00927 
00928 /*
00929  * __db_secondary_corrupt --
00930  *      Report that a secondary index appears corrupt, as it has a record
00931  * that does not correspond to a record in the primary or vice versa.
00932  *
00933  * PUBLIC: int __db_secondary_corrupt __P((DB *));
00934  */
00935 int
00936 __db_secondary_corrupt(dbp)
00937         DB *dbp;
00938 {
00939         __db_err(dbp->dbenv,
00940             "Secondary index corrupt: not consistent with primary");
00941         return (DB_SECONDARY_BAD);
00942 }

Generated on Sun Dec 25 12:14:20 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2