net/remoteserver.cc

Go to the documentation of this file.
00001 
00004 /* Copyright (C) 2006,2007,2008 Olly Betts
00005  * Copyright (C) 2006,2007 Lemur Consulting Ltd
00006  *
00007  * This program is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 2 of the License, or
00010  * (at your option) any later version.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
00020  */
00021 
00022 #include <config.h>
00023 #include "remoteserver.h"
00024 
00025 #include <xapian/database.h>
00026 #include <xapian/enquire.h>
00027 #include <xapian/error.h>
00028 #include <xapian/valueiterator.h>
00029 
00030 #include "safeerrno.h"
00031 #include <signal.h>
00032 #include <stdlib.h>
00033 
00034 #include "autoptr.h"
00035 #include "multimatch.h"
00036 #include "omassert.h"
00037 #include "omtime.h"
00038 #include "serialise.h"
00039 #include "serialise-double.h"
00040 #include "stats.h"
00041 #include "utils.h"
00042 
00044 struct ConnectionClosed { };
00045 
00046 RemoteServer::RemoteServer(const std::vector<std::string> &dbpaths,
00047                            int fdin_, int fdout_,
00048                            Xapian::timeout active_timeout_,
00049                            Xapian::timeout idle_timeout_,
00050                            bool writable)
00051     : RemoteConnection(fdin_, fdout_, ""),
00052       db(NULL), wdb(NULL),
00053       active_timeout(active_timeout_), idle_timeout(idle_timeout_)
00054 {
00055     // Catch errors opening the database and propagate them to the client.
00056     try {
00057         if (writable) {
00058             AssertEq(dbpaths.size(), 1); // Expecting exactly one database.
00059             wdb = new Xapian::WritableDatabase(dbpaths[0], Xapian::DB_CREATE_OR_OPEN);
00060             db = wdb;
00061         } else {
00062             db = new Xapian::Database;
00063             vector<std::string>::const_iterator i;
00064             for (i = dbpaths.begin(); i != dbpaths.end(); ++i)
00065                 db->add_database(Xapian::Database(*i));
00066         }
00067 
00068         // Build a better description than Database::get_description() gives.
00069         // FIXME: improve Database::get_description() and then just use that
00070         // instead.
00071         context = dbpaths[0];
00072         vector<std::string>::const_iterator i(dbpaths.begin());
00073         for (++i; i != dbpaths.end(); ++i) {
00074             context += ' ';
00075             context += *i;
00076         }
00077     } catch (const Xapian::Error &err) {
00078         // Propagate the exception to the client.
00079         send_message(REPLY_EXCEPTION, serialise_error(err));
00080         // And rethrow it so our caller can log it and close the connection.
00081         throw;
00082     }
00083 
00084 #ifndef __WIN32__
00085     // It's simplest to just ignore SIGPIPE.  We'll still know if the
00086     // connection dies because we'll get EPIPE back from write().
00087     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
00088         throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00089 #endif
00090 
00091     // Send greeting message.
00092     string message;
00093     message += char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION);
00094     message += char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION);
00095     message += encode_length(db->get_doccount());
00096     message += encode_length(db->get_lastdocid());
00097     message += (db->has_positions() ? '1' : '0');
00098     message += serialise_double(db->get_avlength());
00099     send_message(REPLY_GREETING, message);
00100 
00101     // Register weighting schemes.
00102     Xapian::Weight * weight;
00103     weight = new Xapian::BM25Weight();
00104     wtschemes[weight->name()] = weight;
00105     weight = new Xapian::BoolWeight();
00106     wtschemes[weight->name()] = weight;
00107     weight = new Xapian::TradWeight();
00108     wtschemes[weight->name()] = weight;
00109 }
00110 
00111 RemoteServer::~RemoteServer()
00112 {
00113     delete db;
00114     // wdb is either NULL or equal to db, so we shouldn't delete it too!
00115 
00116     map<string, Xapian::Weight*>::const_iterator i;
00117     for (i = wtschemes.begin(); i != wtschemes.end(); ++i) {
00118         delete i->second;
00119     }
00120 }
00121 
00122 message_type
00123 RemoteServer::get_message(Xapian::timeout timeout, string & result,
00124                           message_type required_type)
00125 {
00126     unsigned int type;
00127     OmTime end_time;
00128     if (timeout)
00129         end_time = OmTime::now() + timeout;
00130     type = RemoteConnection::get_message(result, end_time);
00131 
00132     // Handle "shutdown connection" message here.
00133     if (type == MSG_SHUTDOWN) throw ConnectionClosed();
00134     if (type >= MSG_MAX) {
00135         string errmsg("Invalid message type ");
00136         errmsg += om_tostring(type);
00137         throw Xapian::NetworkError(errmsg);
00138     }
00139     if (required_type != MSG_MAX && type != unsigned(required_type)) {
00140         string errmsg("Expecting message type ");
00141         errmsg += om_tostring(required_type);
00142         errmsg += ", got ";
00143         errmsg += om_tostring(type);
00144         throw Xapian::NetworkError(errmsg);
00145     }
00146     return static_cast<message_type>(type);
00147 }
00148 
00149 void
00150 RemoteServer::send_message(reply_type type, const string &message)
00151 {
00152     OmTime end_time;
00153     if (active_timeout)
00154         end_time = OmTime::now() + active_timeout;
00155     unsigned char type_as_char = static_cast<unsigned char>(type);
00156     RemoteConnection::send_message(type_as_char, message, end_time);
00157 }
00158 
00159 typedef void (RemoteServer::* dispatch_func)(const string &);
00160 
00161 void
00162 RemoteServer::run()
00163 {
00164     while (true) {
00165         try {
00166             /* This list needs to be kept in the same order as the list of
00167              * message types in "remoteprotocol.h". Note that messages at the
00168              * end of the list in "remoteprotocol.h" can be omitted if they
00169              * don't correspond to dispatch actions.
00170              */
00171             static const dispatch_func dispatch[] = {
00172                 &RemoteServer::msg_allterms,
00173                 &RemoteServer::msg_collfreq,
00174                 &RemoteServer::msg_document,
00175                 &RemoteServer::msg_termexists,
00176                 &RemoteServer::msg_termfreq,
00177                 &RemoteServer::msg_keepalive,
00178                 &RemoteServer::msg_doclength,
00179                 &RemoteServer::msg_query,
00180                 &RemoteServer::msg_termlist,
00181                 &RemoteServer::msg_positionlist,
00182                 &RemoteServer::msg_postlist,
00183                 &RemoteServer::msg_reopen,
00184                 &RemoteServer::msg_update,
00185                 &RemoteServer::msg_adddocument,
00186                 &RemoteServer::msg_cancel,
00187                 &RemoteServer::msg_deletedocument_pre_30_2,
00188                 &RemoteServer::msg_deletedocumentterm,
00189                 &RemoteServer::msg_flush,
00190                 &RemoteServer::msg_replacedocument,
00191                 &RemoteServer::msg_replacedocumentterm,
00192                 NULL, // MSG_GETMSET - used during a conversation.
00193                 NULL, // MSG_SHUTDOWN - handled by get_message().
00194                 &RemoteServer::msg_deletedocument
00195             };
00196 
00197             string message;
00198             size_t type = get_message(idle_timeout, message);
00199             if (type >= sizeof(dispatch)/sizeof(dispatch[0]) ||
00200                 dispatch[type] == NULL) {
00201                 string errmsg("Unexpected message type ");
00202                 errmsg += om_tostring(type);
00203                 throw Xapian::InvalidArgumentError(errmsg);
00204             }
00205             (this->*(dispatch[type]))(message);
00206         } catch (const Xapian::NetworkTimeoutError & e) {
00207             try {
00208                 // We've had a timeout, so the client may not be listening, so
00209                 // if we can't send the message right away, just exit and the
00210                 // client will cope.
00211                 RemoteConnection::send_message(REPLY_EXCEPTION, serialise_error(e), OmTime::now());
00212             } catch (...) {
00213             }
00214             // And rethrow it so our caller can log it and close the
00215             // connection.
00216             throw;
00217         } catch (const Xapian::NetworkError) {
00218             // All other network errors mean we are fatally confused and are
00219             // unlikely to be able to communicate further across this
00220             // connection.  So we don't try to propagate the error to the
00221             // client, but instead just rethrow the exception so our caller can
00222             // log it and close the connection.
00223             throw;
00224         } catch (const Xapian::Error &e) {
00225             // Propagate the exception to the client, then return to the main
00226             // message handling loop.
00227             send_message(REPLY_EXCEPTION, serialise_error(e));
00228         } catch (ConnectionClosed &) {
00229             return;
00230         } catch (...) {
00231             // Propagate an unknown exception to the client.
00232             send_message(REPLY_EXCEPTION, "");
00233             // And rethrow it so our caller can log it and close the
00234             // connection.
00235             throw;
00236         }
00237     }
00238 }
00239 
00240 void
00241 RemoteServer::msg_allterms(const string &message)
00242 {
00243     const char *p = message.data();
00244     const char *p_end = p + message.size();
00245     string prefix(p, p_end - p);
00246 
00247     const Xapian::TermIterator end = db->allterms_end(prefix);
00248     for (Xapian::TermIterator t = db->allterms_begin(prefix); t != end; ++t) {
00249         string item = encode_length(t.get_termfreq());
00250         item += *t;
00251         send_message(REPLY_ALLTERMS, item);
00252     }
00253 
00254     send_message(REPLY_DONE, "");
00255 }
00256 
00257 void
00258 RemoteServer::msg_termlist(const string &message)
00259 {
00260     const char *p = message.data();
00261     const char *p_end = p + message.size();
00262     Xapian::docid did = decode_length(&p, p_end, false);
00263 
00264     send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
00265     const Xapian::TermIterator end = db->termlist_end(did);
00266     for (Xapian::TermIterator t = db->termlist_begin(did); t != end; ++t) {
00267         string item = encode_length(t.get_wdf());
00268         item += encode_length(t.get_termfreq());
00269         item += *t;
00270         send_message(REPLY_TERMLIST, item);
00271     }
00272 
00273     send_message(REPLY_DONE, "");
00274 }
00275 
00276 void
00277 RemoteServer::msg_positionlist(const string &message)
00278 {
00279     const char *p = message.data();
00280     const char *p_end = p + message.size();
00281     Xapian::docid did = decode_length(&p, p_end, false);
00282     string term(p, p_end - p);
00283 
00284     Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00285     const Xapian::PositionIterator end = db->positionlist_end(did, term);
00286     for (Xapian::PositionIterator i = db->positionlist_begin(did, term);
00287          i != end; ++i) {
00288         Xapian::termpos pos = *i;
00289         send_message(REPLY_POSITIONLIST, encode_length(pos - lastpos - 1));
00290         lastpos = pos;
00291     }
00292 
00293     send_message(REPLY_DONE, "");
00294 }
00295 
00296 void
00297 RemoteServer::msg_postlist(const string &message)
00298 {
00299     const char *p = message.data();
00300     const char *p_end = p + message.size();
00301     string term(p, p_end - p);
00302 
00303     Xapian::doccount termfreq = db->get_termfreq(term);
00304     Xapian::termcount collfreq = db->get_collection_freq(term);
00305     send_message(REPLY_POSTLISTSTART, encode_length(termfreq) + encode_length(collfreq));
00306 
00307     Xapian::docid lastdocid = 0;
00308     const Xapian::PostingIterator end = db->postlist_end(term);
00309     for (Xapian::PostingIterator i = db->postlist_begin(term);
00310          i != end; ++i) {
00311 
00312         Xapian::docid newdocid = *i;
00313         string reply = encode_length(newdocid - lastdocid - 1);
00314         reply += encode_length(i.get_wdf());
00315         // FIXME: get_doclength should always return an integer value, but
00316         // Xapian::doclength is a double.  We could improve the compression
00317         // here by casting to an int and serialising that instead, but it's
00318         // probably not worth doing since the plan is to stop storing the
00319         // document length in the posting lists anyway, at which point the
00320         // remote protocol should stop passing it since it will be more
00321         // expensive to do so.
00322         reply += serialise_double(i.get_doclength());
00323 
00324         send_message(REPLY_POSTLISTITEM, reply);
00325         lastdocid = newdocid;
00326     }
00327 
00328     send_message(REPLY_DONE, "");
00329 }
00330 
00331 void
00332 RemoteServer::msg_reopen(const string & msg)
00333 {
00334     db->reopen();
00335     msg_update(msg);
00336 }
00337 
00338 void
00339 RemoteServer::msg_update(const string &)
00340 {
00341     // reopen() doesn't do anything for a WritableDatabase, so there's
00342     // no harm in calling it unconditionally.
00343     db->reopen();
00344 
00345     string message = encode_length(db->get_doccount());
00346     message += encode_length(db->get_lastdocid());
00347     message += (db->has_positions() ? '1' : '0');
00348     message += serialise_double(db->get_avlength());
00349     send_message(REPLY_UPDATE, message);
00350 }
00351 
00352 void
00353 RemoteServer::msg_query(const string &message_in)
00354 {
00355     const char *p = message_in.c_str();
00356     const char *p_end = p + message_in.size();
00357     size_t len;
00358 
00359     // Unserialise the Query.
00360     len = decode_length(&p, p_end, true);
00361     AutoPtr<Xapian::Query::Internal> query(Xapian::Query::Internal::unserialise(string(p, len)));
00362     p += len;
00363 
00364     // Unserialise assorted Enquire settings.
00365     Xapian::termcount qlen = decode_length(&p, p_end, false);
00366 
00367     Xapian::valueno collapse_key = decode_length(&p, p_end, false);
00368 
00369     if (p_end - p < 4 || *p < '0' || *p > '2') {
00370         throw Xapian::NetworkError("bad message (docid_order)");
00371     }
00372     Xapian::Enquire::docid_order order;
00373     order = static_cast<Xapian::Enquire::docid_order>(*p++ - '0');
00374 
00375     Xapian::valueno sort_key = decode_length(&p, p_end, false);
00376 
00377     if (*p < '0' || *p > '3') {
00378         throw Xapian::NetworkError("bad message (sort_by)");
00379     }
00380     Xapian::Enquire::Internal::sort_setting sort_by;
00381     sort_by = static_cast<Xapian::Enquire::Internal::sort_setting>(*p++ - '0');
00382 
00383     if (*p < '0' || *p > '1') {
00384         throw Xapian::NetworkError("bad message (sort_value_forward)");
00385     }
00386     bool sort_value_forward(*p++ != '0');
00387 
00388     int percent_cutoff = *p++;
00389     if (percent_cutoff < 0 || percent_cutoff > 100) {
00390         throw Xapian::NetworkError("bad message (percent_cutoff)");
00391     }
00392 
00393     Xapian::weight weight_cutoff = unserialise_double(&p, p_end);
00394     if (weight_cutoff < 0) {
00395         throw Xapian::NetworkError("bad message (weight_cutoff)");
00396     }
00397 
00398     // Unserialise the Weight object.
00399     len = decode_length(&p, p_end, true);
00400     map<string, Xapian::Weight *>::const_iterator i;
00401     i = wtschemes.find(string(p, len));
00402     if (i == wtschemes.end()) {
00403         throw Xapian::InvalidArgumentError("Weighting scheme " + string(p, len) + " not registered");
00404     }
00405     p += len;
00406 
00407     len = decode_length(&p, p_end, true);
00408     AutoPtr<Xapian::Weight> wt(i->second->unserialise(string(p, len)));
00409     p += len;
00410 
00411     // Unserialise the RSet object.
00412     Xapian::RSet rset = unserialise_rset(string(p, p_end - p));
00413 
00414     Stats local_stats;
00415     MultiMatch match(*db, query.get(), qlen, &rset, collapse_key,
00416                      percent_cutoff, weight_cutoff, order,
00417                      sort_key, sort_by, sort_value_forward, NULL,
00418                      NULL, local_stats, wt.get());
00419 
00420     send_message(REPLY_STATS, serialise_stats(local_stats));
00421 
00422     string message;
00423 #if 0 // Reinstate this when major protocol version increases to 31.
00424     get_message(active_timeout, message, MSG_GETMSET);
00425 #else
00426     char type = get_message(active_timeout, message);
00427     if (rare(type != MSG_GETMSET)) {
00428         if (type != MSG_GETMSET_PRE_30_5 && type != MSG_GETMSET_PRE_30_3) {
00429             string errmsg("Expecting message type ");
00430             errmsg += om_tostring(MSG_GETMSET_PRE_30_3);
00431             errmsg += " or ";
00432             errmsg += om_tostring(MSG_GETMSET_PRE_30_5);
00433             errmsg += " or ";
00434             errmsg += om_tostring(MSG_GETMSET);
00435             errmsg += ", got ";
00436             errmsg += om_tostring(type);
00437             throw Xapian::NetworkError(errmsg);
00438         }
00439     }
00440 #endif
00441     p = message.c_str();
00442     p_end = p + message.size();
00443 
00444     Xapian::termcount first = decode_length(&p, p_end, false);
00445     Xapian::termcount maxitems = decode_length(&p, p_end, false);
00446 
00447     Xapian::termcount check_at_least = 0;
00448     if (type != MSG_GETMSET_PRE_30_3) {
00449         check_at_least = decode_length(&p, p_end, false);
00450     }
00451 
00452     message.erase(0, message.size() - (p_end - p));
00453     Stats total_stats(unserialise_stats(message));
00454 
00455     Xapian::MSet mset;
00456     match.get_mset(first, maxitems, check_at_least, mset, total_stats, 0, 0);
00457 
00458     if (type == MSG_GETMSET_PRE_30_3 || type == MSG_GETMSET_PRE_30_5) {
00459         send_message(REPLY_RESULTS_PRE_30_5, serialise_mset_pre_30_5(mset));
00460     } else {
00461         send_message(REPLY_RESULTS, serialise_mset(mset));
00462     }
00463 }
00464 
00465 void
00466 RemoteServer::msg_document(const string &message)
00467 {
00468     const char *p = message.data();
00469     const char *p_end = p + message.size();
00470     Xapian::docid did = decode_length(&p, p_end, false);
00471 
00472     Xapian::Document doc = db->get_document(did);
00473 
00474     send_message(REPLY_DOCDATA, doc.get_data());
00475 
00476     Xapian::ValueIterator i;
00477     for (i = doc.values_begin(); i != doc.values_end(); ++i) {
00478         string item = encode_length(i.get_valueno());
00479         item += *i;
00480         send_message(REPLY_VALUE, item);
00481     }
00482     send_message(REPLY_DONE, "");
00483 }
00484 
00485 void
00486 RemoteServer::msg_keepalive(const string &)
00487 {
00488     // Ensure *our* database stays alive, as it may contain remote databases!
00489     db->keep_alive();
00490     send_message(REPLY_DONE, "");
00491 }
00492 
00493 void
00494 RemoteServer::msg_termexists(const string &term)
00495 {
00496     send_message((db->term_exists(term) ? REPLY_TERMEXISTS : REPLY_TERMDOESNTEXIST), "");
00497 }
00498 
00499 void
00500 RemoteServer::msg_collfreq(const string &term)
00501 {
00502     send_message(REPLY_COLLFREQ, encode_length(db->get_collection_freq(term)));
00503 }
00504 
00505 void
00506 RemoteServer::msg_termfreq(const string &term)
00507 {
00508     send_message(REPLY_TERMFREQ, encode_length(db->get_termfreq(term)));
00509 }
00510 
00511 void
00512 RemoteServer::msg_doclength(const string &message)
00513 {
00514     const char *p = message.data();
00515     const char *p_end = p + message.size();
00516     Xapian::docid did = decode_length(&p, p_end, false);
00517     // FIXME: get_doclength should always return an integer, but
00518     // Xapian::doclength is a double...
00519     send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
00520 }
00521 
00522 void
00523 RemoteServer::msg_flush(const string &)
00524 {
00525     if (!wdb)
00526         throw Xapian::InvalidOperationError("Server is read-only");
00527 
00528     wdb->flush();
00529 
00530     send_message(REPLY_DONE, "");
00531 }
00532 
00533 void
00534 RemoteServer::msg_cancel(const string &)
00535 {
00536     if (!wdb)
00537         throw Xapian::InvalidOperationError("Server is read-only");
00538 
00539     // We can't call cancel since that's an internal method, but this
00540     // has the same effect with minimal additional overhead.
00541     wdb->begin_transaction(false);
00542     wdb->cancel_transaction();
00543 }
00544 
00545 void
00546 RemoteServer::msg_adddocument(const string & message)
00547 {
00548     if (!wdb)
00549         throw Xapian::InvalidOperationError("Server is read-only");
00550 
00551     Xapian::docid did = wdb->add_document(unserialise_document(message));
00552 
00553     send_message(REPLY_ADDDOCUMENT, encode_length(did));
00554 }
00555 
00556 // FIXME: eliminate this method when we move to remote major 31.
00557 void
00558 RemoteServer::msg_deletedocument_pre_30_2(const string & message)
00559 {
00560     if (!wdb)
00561         throw Xapian::InvalidOperationError("Server is read-only");
00562 
00563     const char *p = message.data();
00564     const char *p_end = p + message.size();
00565     Xapian::docid did = decode_length(&p, p_end, false);
00566 
00567     wdb->delete_document(did);
00568 }
00569 
00570 void
00571 RemoteServer::msg_deletedocument(const string & message)
00572 {
00573     msg_deletedocument_pre_30_2(message);
00574 
00575     send_message(REPLY_DONE, "");
00576 }
00577 
00578 void
00579 RemoteServer::msg_deletedocumentterm(const string & message)
00580 {
00581     if (!wdb)
00582         throw Xapian::InvalidOperationError("Server is read-only");
00583 
00584     wdb->delete_document(message);
00585 }
00586 
00587 void
00588 RemoteServer::msg_replacedocument(const string & message)
00589 {
00590     if (!wdb)
00591         throw Xapian::InvalidOperationError("Server is read-only");
00592 
00593     const char *p = message.data();
00594     const char *p_end = p + message.size();
00595     Xapian::docid did = decode_length(&p, p_end, false);
00596 
00597     wdb->replace_document(did, unserialise_document(string(p, p_end)));
00598 }
00599 
00600 void
00601 RemoteServer::msg_replacedocumentterm(const string & message)
00602 {
00603     if (!wdb)
00604         throw Xapian::InvalidOperationError("Server is read-only");
00605 
00606     const char *p = message.data();
00607     const char *p_end = p + message.size();
00608     size_t len = decode_length(&p, p_end, true);
00609     string unique_term(p, len);
00610     p += len;
00611 
00612     Xapian::docid did = wdb->replace_document(unique_term, unserialise_document(string(p, p_end)));
00613 
00614     send_message(REPLY_ADDDOCUMENT, encode_length(did));
00615 }

Documentation for Xapian (version 1.0.10).
Generated on 24 Dec 2008 by Doxygen 1.5.2.