backends/remote/remote-database.cc

Go to the documentation of this file.
00001 
00004 /* Copyright (C) 2006,2007 Olly Betts
00005  * Copyright (C) 2007 Lemur Consulting Ltd
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License as
00009  * published by the Free Software Foundation; either version 2 of the
00010  * License, or (at your option) any later version.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
00020  */
00021 
00022 #include <config.h>
00023 
00024 #include "safeerrno.h"
00025 #include <signal.h>
00026 
00027 #include <xapian/error.h>
00028 
00029 #include "autoptr.h"
00030 #include "emptypostlist.h"
00031 #include "inmemory_positionlist.h"
00032 #include "net_postlist.h"
00033 #include "net_termlist.h"
00034 #include "net_document.h"
00035 #include "omassert.h"
00036 #include "serialise.h"
00037 #include "serialise-double.h"
00038 #include "stats.h"
00039 #include "stringutils.h" // For STRINGIZE().
00040 #include "utils.h"
00041 
00042 #include <string>
00043 #include <vector>
00044 
00045 using namespace std;
00046 
00047 RemoteDatabase::RemoteDatabase(int fd, Xapian::timeout timeout_,
00048                                const string & context_, bool writable)
00049         : link(fd, fd, context_),
00050           context(context_),
00051           cached_stats_valid(),
00052           timeout(timeout_)
00053 {
00054 #ifndef __WIN32__
00055     // It's simplest to just ignore SIGPIPE.  We'll still know if the
00056     // connection dies because we'll get EPIPE back from write().
00057     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
00058         throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00059     }
00060 #endif
00061 
00062     if (!writable) {
00063         // Transactions only make sense when writing, so flag them as
00064         // "unimplemented" so that our destructor doesn't call dtor_called()
00065         // since that might try to call flush() which will cause a message to
00066         // be sent to the remote server and probably an InvalidOperationError
00067         // exception message to be returned.
00068         transaction_state = TRANSACTION_UNIMPLEMENTED;
00069     }
00070 
00071     string message;
00072     char type = get_message(message);
00073 
00074     if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
00075         if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
00076             // The server reply used to start "OM ", which will now be
00077             // interpreted as a type 'O' message of length size_t('M')
00078             // with first character ' '.
00079             throw Xapian::NetworkError("Server protocol version too old", context);
00080         }
00081         throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
00082     }
00083 
00084     const char *p = message.c_str();
00085     const char *p_end = p + message.size();
00086 
00087     // The protocol major versions must match.  The protocol minor version of
00088     // the server must be >= that of the client.
00089     int protocol_major = static_cast<unsigned char>(*p++);
00090     int protocol_minor = static_cast<unsigned char>(*p++);
00091     if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
00092         protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
00093         string errmsg("Unknown protocol version ");
00094         errmsg += om_tostring(protocol_major);
00095         errmsg += '.';
00096         errmsg += om_tostring(protocol_minor);
00097         errmsg += " ("STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)"."STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION)" supported)";
00098         throw Xapian::NetworkError(errmsg, context);
00099     }
00100 
00101     doccount = decode_length(&p, p_end, false);
00102     lastdocid = decode_length(&p, p_end, false);
00103     if (p == p_end) {
00104         throw Xapian::NetworkError("Bad greeting message received (bool)", context);
00105     }
00106     has_positional_info = (*p++ == '1');
00107     avlength = unserialise_double(&p, p_end);
00108     if (p != p_end || avlength < 0) {
00109         throw Xapian::NetworkError("Bad greeting message received (double)", context);
00110     }
00111 }
00112 
00113 RemoteDatabase *
00114 RemoteDatabase::as_remotedatabase()
00115 {
00116     return this;
00117 }
00118 
00119 void
00120 RemoteDatabase::keep_alive()
00121 {
00122     send_message(MSG_KEEPALIVE, "");
00123     string message;
00124     get_message(message, REPLY_DONE);
00125 }
00126 
00127 TermList *
00128 RemoteDatabase::open_term_list(Xapian::docid did) const
00129 {
00130     if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00131 
00132     // Ensure that avlength and doccount are up-to-date.
00133     if (!cached_stats_valid) update_stats();
00134 
00135     send_message(MSG_TERMLIST, encode_length(did));
00136 
00137     string message;
00138     get_message(message, REPLY_DOCLENGTH);
00139     const char * p = message.c_str();
00140     const char * p_end = p + message.size();
00141     Xapian::doclength doclen = unserialise_double(&p, p_end);
00142     if (p != p_end || doclen < 0) {
00143         throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00144     }
00145 
00146     AutoPtr<NetworkTermList> tlist;
00147     tlist = new NetworkTermList(doclen, doccount,
00148                                 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00149                                 did);
00150     vector<NetworkTermListItem> & items = tlist->items;
00151 
00152     char type;
00153     while ((type = get_message(message)) == REPLY_TERMLIST) {
00154         NetworkTermListItem item;
00155         p = message.data();
00156         p_end = p + message.size();
00157         item.wdf = decode_length(&p, p_end, false);
00158         item.termfreq = decode_length(&p, p_end, false);
00159         item.tname.assign(p, p_end);
00160         items.push_back(item);
00161     }
00162     if (type != REPLY_DONE) {
00163         throw Xapian::NetworkError("Bad message received", context);
00164     }
00165 
00166     tlist->current_position = tlist->items.begin();
00167     return tlist.release();
00168 }
00169 
00170 TermList *
00171 RemoteDatabase::open_allterms(const string & prefix) const {
00172     // Ensure that avlength and doccount are up-to-date.
00173     if (!cached_stats_valid) update_stats();
00174 
00175     send_message(MSG_ALLTERMS, prefix);
00176 
00177     AutoPtr<NetworkTermList> tlist;
00178     tlist = new NetworkTermList(0.0, doccount,
00179                                 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00180                                 0);
00181     vector<NetworkTermListItem> & items = tlist->items;
00182 
00183     string message;
00184     char type;
00185     while ((type = get_message(message)) == REPLY_ALLTERMS) {
00186         NetworkTermListItem item;
00187         const char * p = message.data();
00188         const char * p_end = p + message.size();
00189         item.termfreq = decode_length(&p, p_end, false);
00190         item.tname.assign(p, p_end);
00191         items.push_back(item);
00192     }
00193     if (type != REPLY_DONE) {
00194         throw Xapian::NetworkError("Bad message received", context);
00195     }
00196 
00197     tlist->current_position = tlist->items.begin();
00198     return tlist.release();
00199 }
00200 
00201 LeafPostList *
00202 RemoteDatabase::open_post_list(const string &term) const
00203 {
00204     return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
00205 }
00206 
00207 Xapian::doccount
00208 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
00209 {
00210     send_message(MSG_POSTLIST, term);
00211 
00212     string message;
00213     char type;
00214     get_message(message, REPLY_POSTLISTSTART);
00215 
00216     const char * p = message.data();
00217     const char * p_end = p + message.size();
00218     Xapian::doccount termfreq = decode_length(&p, p_end, false);
00219 
00220     while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
00221         pl.append_posting(message);
00222     }
00223     if (type != REPLY_DONE) {
00224         throw Xapian::NetworkError("Bad message received", context);
00225     }
00226 
00227     return termfreq;
00228 }
00229 
00230 PositionList *
00231 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
00232 {
00233     send_message(MSG_POSITIONLIST, encode_length(did) + term);
00234 
00235     vector<Xapian::termpos> positions;
00236 
00237     string message;
00238     char type;
00239     Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00240     while ((type = get_message(message)) == REPLY_POSITIONLIST) {
00241         const char * p = message.data();
00242         const char * p_end = p + message.size();
00243         lastpos += decode_length(&p, p_end, false) + 1;
00244         positions.push_back(lastpos);
00245     }
00246     if (type != REPLY_DONE) {
00247         throw Xapian::NetworkError("Bad message received", context);
00248     }
00249 
00250     return new InMemoryPositionList(positions);
00251 }
00252 
00253 bool
00254 RemoteDatabase::has_positions() const
00255 {
00256     if (!cached_stats_valid) update_stats();
00257     return has_positional_info;
00258 }
00259 
00260 void
00261 RemoteDatabase::reopen()
00262 {
00263     update_stats(MSG_REOPEN);
00264 }
00265 
00266 // Currently lazy is only used in three cases, all in multimatch.cc.  One is
00267 // when using a MatchDecider, which we don't support with the remote backend
00268 // currently.  The others are for the sort key and collapse key which in the
00269 // remote cases is fetched during the remote match and passed across with the
00270 // MSet.  So we can safely ignore it here for now without any performance
00271 // penalty.
00272 Xapian::Document::Internal *
00273 RemoteDatabase::open_document(Xapian::docid did, bool /*lazy*/) const
00274 {
00275     if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00276 
00277     send_message(MSG_DOCUMENT, encode_length(did));
00278     string doc_data;
00279     map<Xapian::valueno, string> values;
00280     get_message(doc_data, REPLY_DOCDATA);
00281 
00282     reply_type type;
00283     string message;
00284     while ((type = get_message(message)) == REPLY_VALUE) {
00285         const char * p = message.data();
00286         const char * p_end = p + message.size();
00287         Xapian::valueno valueno = decode_length(&p, p_end, false);
00288         values.insert(make_pair(valueno, string(p, p_end)));
00289     }
00290     if (type != REPLY_DONE) {
00291         throw Xapian::NetworkError("Bad message received", context);
00292     }
00293 
00294     return new NetworkDocument(this, did, doc_data, values);
00295 }
00296 
00297 void
00298 RemoteDatabase::update_stats(message_type msg_code) const
00299 {
00300     send_message(msg_code, "");
00301     string message;
00302     get_message(message, REPLY_UPDATE);
00303     const char * p = message.c_str();
00304     const char * p_end = p + message.size();
00305     doccount = decode_length(&p, p_end, false);
00306     lastdocid = decode_length(&p, p_end, false);
00307     if (p == p_end) {
00308         throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00309     }
00310     has_positional_info = (*p++ == '1');
00311     avlength = unserialise_double(&p, p_end);
00312     if (p != p_end || avlength < 0) {
00313         throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00314     }
00315     cached_stats_valid = true;
00316 }
00317 
00318 Xapian::doccount
00319 RemoteDatabase::get_doccount() const
00320 {
00321     if (!cached_stats_valid) update_stats();
00322     return doccount;
00323 }
00324 
00325 Xapian::docid
00326 RemoteDatabase::get_lastdocid() const
00327 {
00328     if (!cached_stats_valid) update_stats();
00329     return lastdocid;
00330 }
00331 
00332 Xapian::doclength
00333 RemoteDatabase::get_avlength() const
00334 {
00335     if (!cached_stats_valid) update_stats();
00336     return avlength;
00337 }
00338 
00339 bool
00340 RemoteDatabase::term_exists(const string & tname) const
00341 {
00342     Assert(!tname.empty());
00343     send_message(MSG_TERMEXISTS, tname);
00344     string message;
00345     reply_type type = get_message(message);
00346     if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
00347         throw Xapian::NetworkError("Bad message received", context);
00348     }
00349     return (type == REPLY_TERMEXISTS);
00350 }
00351 
00352 Xapian::doccount
00353 RemoteDatabase::get_termfreq(const string & tname) const
00354 {
00355     Assert(!tname.empty());
00356     send_message(MSG_TERMFREQ, tname);
00357     string message;
00358     get_message(message, REPLY_TERMFREQ);
00359     const char * p = message.data();
00360     const char * p_end = p + message.size();
00361     return decode_length(&p, p_end, false);
00362 }
00363 
00364 Xapian::termcount
00365 RemoteDatabase::get_collection_freq(const string & tname) const
00366 {
00367     Assert(!tname.empty());
00368     send_message(MSG_COLLFREQ, tname);
00369     string message;
00370     get_message(message, REPLY_COLLFREQ);
00371     const char * p = message.data();
00372     const char * p_end = p + message.size();
00373     return decode_length(&p, p_end, false);
00374 }
00375 
00376 Xapian::doclength
00377 RemoteDatabase::get_doclength(Xapian::docid did) const
00378 {
00379     Assert(did != 0);
00380     send_message(MSG_DOCLENGTH, encode_length(did));
00381     string message;
00382     get_message(message, REPLY_DOCLENGTH);
00383     const char * p = message.c_str();
00384     const char * p_end = p + message.size();
00385     Xapian::doclength doclen = unserialise_double(&p, p_end);
00386     if (p != p_end || doclen < 0) {
00387         throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00388     }
00389     return doclen;
00390 }
00391 
00392 reply_type
00393 RemoteDatabase::get_message(string &result, reply_type required_type) const
00394 {
00395     OmTime end_time;
00396     if (timeout) end_time = OmTime::now() + timeout;
00397 
00398     reply_type type = static_cast<reply_type>(link.get_message(result, end_time));
00399     if (type == REPLY_EXCEPTION) {
00400         unserialise_error(result, "REMOTE:", context);
00401     }
00402     if (required_type != REPLY_MAX && type != required_type) {
00403         string errmsg("Expecting reply type ");
00404         errmsg += om_tostring(required_type);
00405         errmsg += ", got ";
00406         errmsg += om_tostring(type);
00407         throw Xapian::NetworkError(errmsg);
00408     }
00409 
00410     return type;
00411 }
00412 
00413 void
00414 RemoteDatabase::send_message(message_type type, const string &message) const
00415 {
00416     OmTime end_time;
00417     if (timeout) end_time = OmTime::now() + timeout;
00418 
00419     link.send_message(static_cast<unsigned char>(type), message, end_time);
00420 }
00421 
00422 void
00423 RemoteDatabase::do_close()
00424 {
00425     // In the constructor, we set transaction_state to
00426     // TRANSACTION_UNIMPLEMENTED if we aren't writable so that we can check
00427     // it here.
00428     bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
00429 
00430     // Only call dtor_called() if we're writable.
00431     if (writable) dtor_called();
00432 
00433     // If we're writable, wait for a confirmation of the close, so we know that
00434     // changes have been written and flushed, and the database write lock
00435     // released.  For the non-writable case, there's no need to wait, so don't
00436     // slow down searching by waiting here.
00437     link.do_close(writable);
00438 }
00439 
00440 void
00441 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
00442                          Xapian::termcount qlen,
00443                          Xapian::valueno collapse_key,
00444                          Xapian::Enquire::docid_order order,
00445                          Xapian::valueno sort_key,
00446                          Xapian::Enquire::Internal::sort_setting sort_by,
00447                          bool sort_value_forward,
00448                          int percent_cutoff, Xapian::weight weight_cutoff,
00449                          const Xapian::Weight *wtscheme,
00450                          const Xapian::RSet &omrset)
00451 {
00452     string tmp = query->serialise();
00453     string message = encode_length(tmp.size());
00454     message += tmp;
00455 
00456     // Serialise assorted Enquire settings.
00457     message += encode_length(qlen);
00458     message += encode_length(collapse_key);
00459     message += char('0' + order);
00460     message += encode_length(sort_key);
00461     message += char('0' + sort_by);
00462     message += char('0' + sort_value_forward);
00463     message += char(percent_cutoff);
00464     message += serialise_double(weight_cutoff);
00465 
00466     tmp = wtscheme->name();
00467     message += encode_length(tmp.size());
00468     message += tmp;
00469 
00470     tmp = wtscheme->serialise();
00471     message += encode_length(tmp.size());
00472     message += tmp;
00473 
00474     message += serialise_rset(omrset);
00475 
00476     send_message(MSG_QUERY, message);
00477 }
00478 
00479 bool
00480 RemoteDatabase::get_remote_stats(bool nowait, Stats &out)
00481 {
00482     if (nowait && !link.ready_to_read()) return false;
00483 
00484     string message;
00485     get_message(message, REPLY_STATS);
00486     out = unserialise_stats(message);
00487 
00488     return true;
00489 }
00490 
00491 void
00492 RemoteDatabase::send_global_stats(Xapian::doccount first,
00493                                 Xapian::doccount maxitems,
00494                                 Xapian::doccount check_at_least,
00495                                 const Stats &stats)
00496 {
00497     string message = encode_length(first);
00498     message += encode_length(maxitems);
00499     message += encode_length(check_at_least);
00500     message += serialise_stats(stats);
00501     send_message(MSG_GETMSET, message);
00502 }
00503 
00504 void
00505 RemoteDatabase::get_mset(Xapian::MSet &mset)
00506 {
00507     string message;
00508     get_message(message, REPLY_RESULTS);
00509     mset = unserialise_mset(message);
00510 }
00511 
00512 void
00513 RemoteDatabase::flush()
00514 {
00515     send_message(MSG_FLUSH, "");
00516 
00517     // We need to wait for a response to ensure documents have been committed.
00518     string message;
00519     get_message(message, REPLY_DONE);
00520 }
00521 
00522 void
00523 RemoteDatabase::cancel()
00524 {
00525     cached_stats_valid = false;
00526 
00527     send_message(MSG_CANCEL, "");
00528 }
00529 
00530 Xapian::docid
00531 RemoteDatabase::add_document(const Xapian::Document & doc)
00532 {
00533     cached_stats_valid = false;
00534 
00535     send_message(MSG_ADDDOCUMENT, serialise_document(doc));
00536 
00537     string message;
00538     get_message(message, REPLY_ADDDOCUMENT);
00539 
00540     const char * p = message.data();
00541     const char * p_end = p + message.size();
00542     return decode_length(&p, p_end, false);
00543 }
00544 
00545 void
00546 RemoteDatabase::delete_document(Xapian::docid did)
00547 {
00548     cached_stats_valid = false;
00549 
00550 //    send_message(MSG_DELETEDOCUMENT_PRE_30_2, encode_length(did));
00551     send_message(MSG_DELETEDOCUMENT, encode_length(did));
00552     string dummy;
00553     get_message(dummy, REPLY_DONE);
00554 }
00555 
00556 void
00557 RemoteDatabase::delete_document(const std::string & unique_term)
00558 {
00559     cached_stats_valid = false;
00560 
00561     send_message(MSG_DELETEDOCUMENTTERM, unique_term);
00562 }
00563 
00564 void
00565 RemoteDatabase::replace_document(Xapian::docid did,
00566                                  const Xapian::Document & doc)
00567 {
00568     cached_stats_valid = false;
00569 
00570     string message = encode_length(did);
00571     message += serialise_document(doc);
00572 
00573     send_message(MSG_REPLACEDOCUMENT, message);
00574 }
00575 
00576 Xapian::docid
00577 RemoteDatabase::replace_document(const std::string & unique_term,
00578                                  const Xapian::Document & doc)
00579 {
00580     cached_stats_valid = false;
00581 
00582     string message = encode_length(unique_term.size());
00583     message += unique_term;
00584     message += serialise_document(doc);
00585 
00586     send_message(MSG_REPLACEDOCUMENTTERM, message);
00587 
00588     get_message(message, REPLY_ADDDOCUMENT);
00589 
00590     const char * p = message.data();
00591     const char * p_end = p + message.size();
00592     return decode_length(&p, p_end, false);
00593 }

Documentation for Xapian (version 1.0.10).
Generated on 24 Dec 2008 by Doxygen 1.5.2.