00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023 #include "remoteserver.h"
00024
00025 #include <xapian/database.h>
00026 #include <xapian/enquire.h>
00027 #include <xapian/error.h>
00028 #include <xapian/valueiterator.h>
00029
00030 #include "safeerrno.h"
00031 #include <signal.h>
00032 #include <stdlib.h>
00033
00034 #include "autoptr.h"
00035 #include "multimatch.h"
00036 #include "omassert.h"
00037 #include "omtime.h"
00038 #include "serialise.h"
00039 #include "serialise-double.h"
00040 #include "stats.h"
00041 #include "utils.h"
00042
00044 struct ConnectionClosed { };
00045
00046 RemoteServer::RemoteServer(const std::vector<std::string> &dbpaths,
00047 int fdin_, int fdout_,
00048 Xapian::timeout active_timeout_,
00049 Xapian::timeout idle_timeout_,
00050 bool writable)
00051 : RemoteConnection(fdin_, fdout_, ""),
00052 db(NULL), wdb(NULL),
00053 active_timeout(active_timeout_), idle_timeout(idle_timeout_)
00054 {
00055
00056 try {
00057 if (writable) {
00058 AssertEq(dbpaths.size(), 1);
00059 wdb = new Xapian::WritableDatabase(dbpaths[0], Xapian::DB_CREATE_OR_OPEN);
00060 db = wdb;
00061 } else {
00062 db = new Xapian::Database;
00063 vector<std::string>::const_iterator i;
00064 for (i = dbpaths.begin(); i != dbpaths.end(); ++i)
00065 db->add_database(Xapian::Database(*i));
00066 }
00067
00068
00069
00070
00071 context = dbpaths[0];
00072 vector<std::string>::const_iterator i(dbpaths.begin());
00073 for (++i; i != dbpaths.end(); ++i) {
00074 context += ' ';
00075 context += *i;
00076 }
00077 } catch (const Xapian::Error &err) {
00078
00079 send_message(REPLY_EXCEPTION, serialise_error(err));
00080
00081 throw;
00082 }
00083
00084 #ifndef __WIN32__
00085
00086
00087 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
00088 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00089 #endif
00090
00091
00092 string message;
00093 message += char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION);
00094 message += char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION);
00095 message += encode_length(db->get_doccount());
00096 message += encode_length(db->get_lastdocid());
00097 message += (db->has_positions() ? '1' : '0');
00098 message += serialise_double(db->get_avlength());
00099 send_message(REPLY_GREETING, message);
00100
00101
00102 Xapian::Weight * weight;
00103 weight = new Xapian::BM25Weight();
00104 wtschemes[weight->name()] = weight;
00105 weight = new Xapian::BoolWeight();
00106 wtschemes[weight->name()] = weight;
00107 weight = new Xapian::TradWeight();
00108 wtschemes[weight->name()] = weight;
00109 }
00110
00111 RemoteServer::~RemoteServer()
00112 {
00113 delete db;
00114
00115
00116 map<string, Xapian::Weight*>::const_iterator i;
00117 for (i = wtschemes.begin(); i != wtschemes.end(); ++i) {
00118 delete i->second;
00119 }
00120 }
00121
00122 message_type
00123 RemoteServer::get_message(Xapian::timeout timeout, string & result,
00124 message_type required_type)
00125 {
00126 unsigned int type;
00127 OmTime end_time;
00128 if (timeout)
00129 end_time = OmTime::now() + timeout;
00130 type = RemoteConnection::get_message(result, end_time);
00131
00132
00133 if (type == MSG_SHUTDOWN) throw ConnectionClosed();
00134 if (type >= MSG_MAX) {
00135 string errmsg("Invalid message type ");
00136 errmsg += om_tostring(type);
00137 throw Xapian::NetworkError(errmsg);
00138 }
00139 if (required_type != MSG_MAX && type != unsigned(required_type)) {
00140 string errmsg("Expecting message type ");
00141 errmsg += om_tostring(required_type);
00142 errmsg += ", got ";
00143 errmsg += om_tostring(type);
00144 throw Xapian::NetworkError(errmsg);
00145 }
00146 return static_cast<message_type>(type);
00147 }
00148
00149 void
00150 RemoteServer::send_message(reply_type type, const string &message)
00151 {
00152 OmTime end_time;
00153 if (active_timeout)
00154 end_time = OmTime::now() + active_timeout;
00155 unsigned char type_as_char = static_cast<unsigned char>(type);
00156 RemoteConnection::send_message(type_as_char, message, end_time);
00157 }
00158
00159 typedef void (RemoteServer::* dispatch_func)(const string &);
00160
00161 void
00162 RemoteServer::run()
00163 {
00164 while (true) {
00165 try {
00166
00167
00168
00169
00170
00171 static const dispatch_func dispatch[] = {
00172 &RemoteServer::msg_allterms,
00173 &RemoteServer::msg_collfreq,
00174 &RemoteServer::msg_document,
00175 &RemoteServer::msg_termexists,
00176 &RemoteServer::msg_termfreq,
00177 &RemoteServer::msg_keepalive,
00178 &RemoteServer::msg_doclength,
00179 &RemoteServer::msg_query,
00180 &RemoteServer::msg_termlist,
00181 &RemoteServer::msg_positionlist,
00182 &RemoteServer::msg_postlist,
00183 &RemoteServer::msg_reopen,
00184 &RemoteServer::msg_update,
00185 &RemoteServer::msg_adddocument,
00186 &RemoteServer::msg_cancel,
00187 &RemoteServer::msg_deletedocument_pre_30_2,
00188 &RemoteServer::msg_deletedocumentterm,
00189 &RemoteServer::msg_flush,
00190 &RemoteServer::msg_replacedocument,
00191 &RemoteServer::msg_replacedocumentterm,
00192 NULL,
00193 NULL,
00194 &RemoteServer::msg_deletedocument
00195 };
00196
00197 string message;
00198 size_t type = get_message(idle_timeout, message);
00199 if (type >= sizeof(dispatch)/sizeof(dispatch[0]) ||
00200 dispatch[type] == NULL) {
00201 string errmsg("Unexpected message type ");
00202 errmsg += om_tostring(type);
00203 throw Xapian::InvalidArgumentError(errmsg);
00204 }
00205 (this->*(dispatch[type]))(message);
00206 } catch (const Xapian::NetworkTimeoutError & e) {
00207 try {
00208
00209
00210
00211 RemoteConnection::send_message(REPLY_EXCEPTION, serialise_error(e), OmTime::now());
00212 } catch (...) {
00213 }
00214
00215
00216 throw;
00217 } catch (const Xapian::NetworkError) {
00218
00219
00220
00221
00222
00223 throw;
00224 } catch (const Xapian::Error &e) {
00225
00226
00227 send_message(REPLY_EXCEPTION, serialise_error(e));
00228 } catch (ConnectionClosed &) {
00229 return;
00230 } catch (...) {
00231
00232 send_message(REPLY_EXCEPTION, "");
00233
00234
00235 throw;
00236 }
00237 }
00238 }
00239
00240 void
00241 RemoteServer::msg_allterms(const string &message)
00242 {
00243 const char *p = message.data();
00244 const char *p_end = p + message.size();
00245 string prefix(p, p_end - p);
00246
00247 const Xapian::TermIterator end = db->allterms_end(prefix);
00248 for (Xapian::TermIterator t = db->allterms_begin(prefix); t != end; ++t) {
00249 string item = encode_length(t.get_termfreq());
00250 item += *t;
00251 send_message(REPLY_ALLTERMS, item);
00252 }
00253
00254 send_message(REPLY_DONE, "");
00255 }
00256
00257 void
00258 RemoteServer::msg_termlist(const string &message)
00259 {
00260 const char *p = message.data();
00261 const char *p_end = p + message.size();
00262 Xapian::docid did = decode_length(&p, p_end, false);
00263
00264 send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
00265 const Xapian::TermIterator end = db->termlist_end(did);
00266 for (Xapian::TermIterator t = db->termlist_begin(did); t != end; ++t) {
00267 string item = encode_length(t.get_wdf());
00268 item += encode_length(t.get_termfreq());
00269 item += *t;
00270 send_message(REPLY_TERMLIST, item);
00271 }
00272
00273 send_message(REPLY_DONE, "");
00274 }
00275
00276 void
00277 RemoteServer::msg_positionlist(const string &message)
00278 {
00279 const char *p = message.data();
00280 const char *p_end = p + message.size();
00281 Xapian::docid did = decode_length(&p, p_end, false);
00282 string term(p, p_end - p);
00283
00284 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00285 const Xapian::PositionIterator end = db->positionlist_end(did, term);
00286 for (Xapian::PositionIterator i = db->positionlist_begin(did, term);
00287 i != end; ++i) {
00288 Xapian::termpos pos = *i;
00289 send_message(REPLY_POSITIONLIST, encode_length(pos - lastpos - 1));
00290 lastpos = pos;
00291 }
00292
00293 send_message(REPLY_DONE, "");
00294 }
00295
00296 void
00297 RemoteServer::msg_postlist(const string &message)
00298 {
00299 const char *p = message.data();
00300 const char *p_end = p + message.size();
00301 string term(p, p_end - p);
00302
00303 Xapian::doccount termfreq = db->get_termfreq(term);
00304 Xapian::termcount collfreq = db->get_collection_freq(term);
00305 send_message(REPLY_POSTLISTSTART, encode_length(termfreq) + encode_length(collfreq));
00306
00307 Xapian::docid lastdocid = 0;
00308 const Xapian::PostingIterator end = db->postlist_end(term);
00309 for (Xapian::PostingIterator i = db->postlist_begin(term);
00310 i != end; ++i) {
00311
00312 Xapian::docid newdocid = *i;
00313 string reply = encode_length(newdocid - lastdocid - 1);
00314 reply += encode_length(i.get_wdf());
00315
00316
00317
00318
00319
00320
00321
00322 reply += serialise_double(i.get_doclength());
00323
00324 send_message(REPLY_POSTLISTITEM, reply);
00325 lastdocid = newdocid;
00326 }
00327
00328 send_message(REPLY_DONE, "");
00329 }
00330
00331 void
00332 RemoteServer::msg_reopen(const string & msg)
00333 {
00334 db->reopen();
00335 msg_update(msg);
00336 }
00337
00338 void
00339 RemoteServer::msg_update(const string &)
00340 {
00341
00342
00343 db->reopen();
00344
00345 string message = encode_length(db->get_doccount());
00346 message += encode_length(db->get_lastdocid());
00347 message += (db->has_positions() ? '1' : '0');
00348 message += serialise_double(db->get_avlength());
00349 send_message(REPLY_UPDATE, message);
00350 }
00351
00352 void
00353 RemoteServer::msg_query(const string &message_in)
00354 {
00355 const char *p = message_in.c_str();
00356 const char *p_end = p + message_in.size();
00357 size_t len;
00358
00359
00360 len = decode_length(&p, p_end, true);
00361 AutoPtr<Xapian::Query::Internal> query(Xapian::Query::Internal::unserialise(string(p, len)));
00362 p += len;
00363
00364
00365 Xapian::termcount qlen = decode_length(&p, p_end, false);
00366
00367 Xapian::valueno collapse_key = decode_length(&p, p_end, false);
00368
00369 if (p_end - p < 4 || *p < '0' || *p > '2') {
00370 throw Xapian::NetworkError("bad message (docid_order)");
00371 }
00372 Xapian::Enquire::docid_order order;
00373 order = static_cast<Xapian::Enquire::docid_order>(*p++ - '0');
00374
00375 Xapian::valueno sort_key = decode_length(&p, p_end, false);
00376
00377 if (*p < '0' || *p > '3') {
00378 throw Xapian::NetworkError("bad message (sort_by)");
00379 }
00380 Xapian::Enquire::Internal::sort_setting sort_by;
00381 sort_by = static_cast<Xapian::Enquire::Internal::sort_setting>(*p++ - '0');
00382
00383 if (*p < '0' || *p > '1') {
00384 throw Xapian::NetworkError("bad message (sort_value_forward)");
00385 }
00386 bool sort_value_forward(*p++ != '0');
00387
00388 int percent_cutoff = *p++;
00389 if (percent_cutoff < 0 || percent_cutoff > 100) {
00390 throw Xapian::NetworkError("bad message (percent_cutoff)");
00391 }
00392
00393 Xapian::weight weight_cutoff = unserialise_double(&p, p_end);
00394 if (weight_cutoff < 0) {
00395 throw Xapian::NetworkError("bad message (weight_cutoff)");
00396 }
00397
00398
00399 len = decode_length(&p, p_end, true);
00400 map<string, Xapian::Weight *>::const_iterator i;
00401 i = wtschemes.find(string(p, len));
00402 if (i == wtschemes.end()) {
00403 throw Xapian::InvalidArgumentError("Weighting scheme " + string(p, len) + " not registered");
00404 }
00405 p += len;
00406
00407 len = decode_length(&p, p_end, true);
00408 AutoPtr<Xapian::Weight> wt(i->second->unserialise(string(p, len)));
00409 p += len;
00410
00411
00412 Xapian::RSet rset = unserialise_rset(string(p, p_end - p));
00413
00414 Stats local_stats;
00415 MultiMatch match(*db, query.get(), qlen, &rset, collapse_key,
00416 percent_cutoff, weight_cutoff, order,
00417 sort_key, sort_by, sort_value_forward, NULL,
00418 NULL, local_stats, wt.get());
00419
00420 send_message(REPLY_STATS, serialise_stats(local_stats));
00421
00422 string message;
00423 #if 0 // Reinstate this when major protocol version increases to 31.
00424 get_message(active_timeout, message, MSG_GETMSET);
00425 #else
00426 char type = get_message(active_timeout, message);
00427 if (rare(type != MSG_GETMSET)) {
00428 if (type != MSG_GETMSET_PRE_30_5 && type != MSG_GETMSET_PRE_30_3) {
00429 string errmsg("Expecting message type ");
00430 errmsg += om_tostring(MSG_GETMSET_PRE_30_3);
00431 errmsg += " or ";
00432 errmsg += om_tostring(MSG_GETMSET_PRE_30_5);
00433 errmsg += " or ";
00434 errmsg += om_tostring(MSG_GETMSET);
00435 errmsg += ", got ";
00436 errmsg += om_tostring(type);
00437 throw Xapian::NetworkError(errmsg);
00438 }
00439 }
00440 #endif
00441 p = message.c_str();
00442 p_end = p + message.size();
00443
00444 Xapian::termcount first = decode_length(&p, p_end, false);
00445 Xapian::termcount maxitems = decode_length(&p, p_end, false);
00446
00447 Xapian::termcount check_at_least = 0;
00448 if (type != MSG_GETMSET_PRE_30_3) {
00449 check_at_least = decode_length(&p, p_end, false);
00450 }
00451
00452 message.erase(0, message.size() - (p_end - p));
00453 Stats total_stats(unserialise_stats(message));
00454
00455 Xapian::MSet mset;
00456 match.get_mset(first, maxitems, check_at_least, mset, total_stats, 0, 0);
00457
00458 if (type == MSG_GETMSET_PRE_30_3 || type == MSG_GETMSET_PRE_30_5) {
00459 send_message(REPLY_RESULTS_PRE_30_5, serialise_mset_pre_30_5(mset));
00460 } else {
00461 send_message(REPLY_RESULTS, serialise_mset(mset));
00462 }
00463 }
00464
00465 void
00466 RemoteServer::msg_document(const string &message)
00467 {
00468 const char *p = message.data();
00469 const char *p_end = p + message.size();
00470 Xapian::docid did = decode_length(&p, p_end, false);
00471
00472 Xapian::Document doc = db->get_document(did);
00473
00474 send_message(REPLY_DOCDATA, doc.get_data());
00475
00476 Xapian::ValueIterator i;
00477 for (i = doc.values_begin(); i != doc.values_end(); ++i) {
00478 string item = encode_length(i.get_valueno());
00479 item += *i;
00480 send_message(REPLY_VALUE, item);
00481 }
00482 send_message(REPLY_DONE, "");
00483 }
00484
00485 void
00486 RemoteServer::msg_keepalive(const string &)
00487 {
00488
00489 db->keep_alive();
00490 send_message(REPLY_DONE, "");
00491 }
00492
00493 void
00494 RemoteServer::msg_termexists(const string &term)
00495 {
00496 send_message((db->term_exists(term) ? REPLY_TERMEXISTS : REPLY_TERMDOESNTEXIST), "");
00497 }
00498
00499 void
00500 RemoteServer::msg_collfreq(const string &term)
00501 {
00502 send_message(REPLY_COLLFREQ, encode_length(db->get_collection_freq(term)));
00503 }
00504
00505 void
00506 RemoteServer::msg_termfreq(const string &term)
00507 {
00508 send_message(REPLY_TERMFREQ, encode_length(db->get_termfreq(term)));
00509 }
00510
00511 void
00512 RemoteServer::msg_doclength(const string &message)
00513 {
00514 const char *p = message.data();
00515 const char *p_end = p + message.size();
00516 Xapian::docid did = decode_length(&p, p_end, false);
00517
00518
00519 send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
00520 }
00521
00522 void
00523 RemoteServer::msg_flush(const string &)
00524 {
00525 if (!wdb)
00526 throw Xapian::InvalidOperationError("Server is read-only");
00527
00528 wdb->flush();
00529
00530 send_message(REPLY_DONE, "");
00531 }
00532
00533 void
00534 RemoteServer::msg_cancel(const string &)
00535 {
00536 if (!wdb)
00537 throw Xapian::InvalidOperationError("Server is read-only");
00538
00539
00540
00541 wdb->begin_transaction(false);
00542 wdb->cancel_transaction();
00543 }
00544
00545 void
00546 RemoteServer::msg_adddocument(const string & message)
00547 {
00548 if (!wdb)
00549 throw Xapian::InvalidOperationError("Server is read-only");
00550
00551 Xapian::docid did = wdb->add_document(unserialise_document(message));
00552
00553 send_message(REPLY_ADDDOCUMENT, encode_length(did));
00554 }
00555
00556
00557 void
00558 RemoteServer::msg_deletedocument_pre_30_2(const string & message)
00559 {
00560 if (!wdb)
00561 throw Xapian::InvalidOperationError("Server is read-only");
00562
00563 const char *p = message.data();
00564 const char *p_end = p + message.size();
00565 Xapian::docid did = decode_length(&p, p_end, false);
00566
00567 wdb->delete_document(did);
00568 }
00569
00570 void
00571 RemoteServer::msg_deletedocument(const string & message)
00572 {
00573 msg_deletedocument_pre_30_2(message);
00574
00575 send_message(REPLY_DONE, "");
00576 }
00577
00578 void
00579 RemoteServer::msg_deletedocumentterm(const string & message)
00580 {
00581 if (!wdb)
00582 throw Xapian::InvalidOperationError("Server is read-only");
00583
00584 wdb->delete_document(message);
00585 }
00586
00587 void
00588 RemoteServer::msg_replacedocument(const string & message)
00589 {
00590 if (!wdb)
00591 throw Xapian::InvalidOperationError("Server is read-only");
00592
00593 const char *p = message.data();
00594 const char *p_end = p + message.size();
00595 Xapian::docid did = decode_length(&p, p_end, false);
00596
00597 wdb->replace_document(did, unserialise_document(string(p, p_end)));
00598 }
00599
00600 void
00601 RemoteServer::msg_replacedocumentterm(const string & message)
00602 {
00603 if (!wdb)
00604 throw Xapian::InvalidOperationError("Server is read-only");
00605
00606 const char *p = message.data();
00607 const char *p_end = p + message.size();
00608 size_t len = decode_length(&p, p_end, true);
00609 string unique_term(p, len);
00610 p += len;
00611
00612 Xapian::docid did = wdb->replace_document(unique_term, unserialise_document(string(p, p_end)));
00613
00614 send_message(REPLY_ADDDOCUMENT, encode_length(did));
00615 }