00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "safeerrno.h"
00025 #include <signal.h>
00026
00027 #include <xapian/error.h>
00028
00029 #include "autoptr.h"
00030 #include "emptypostlist.h"
00031 #include "inmemory_positionlist.h"
00032 #include "net_postlist.h"
00033 #include "net_termlist.h"
00034 #include "net_document.h"
00035 #include "omassert.h"
00036 #include "serialise.h"
00037 #include "serialise-double.h"
00038 #include "stats.h"
00039 #include "stringutils.h"
00040 #include "utils.h"
00041
00042 #include <string>
00043 #include <vector>
00044
00045 using namespace std;
00046
00047 RemoteDatabase::RemoteDatabase(int fd, Xapian::timeout timeout_,
00048 const string & context_, bool writable)
00049 : link(fd, fd, context_),
00050 context(context_),
00051 cached_stats_valid(),
00052 timeout(timeout_)
00053 {
00054 #ifndef __WIN32__
00055
00056
00057 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
00058 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00059 }
00060 #endif
00061
00062 if (!writable) {
00063
00064
00065
00066
00067
00068 transaction_state = TRANSACTION_UNIMPLEMENTED;
00069 }
00070
00071 string message;
00072 char type = get_message(message);
00073
00074 if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
00075 if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
00076
00077
00078
00079 throw Xapian::NetworkError("Server protocol version too old", context);
00080 }
00081 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
00082 }
00083
00084 const char *p = message.c_str();
00085 const char *p_end = p + message.size();
00086
00087
00088
00089 int protocol_major = static_cast<unsigned char>(*p++);
00090 int protocol_minor = static_cast<unsigned char>(*p++);
00091 if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
00092 protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
00093 string errmsg("Unknown protocol version ");
00094 errmsg += om_tostring(protocol_major);
00095 errmsg += '.';
00096 errmsg += om_tostring(protocol_minor);
00097 errmsg += " ("STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)"."STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION)" supported)";
00098 throw Xapian::NetworkError(errmsg, context);
00099 }
00100
00101 doccount = decode_length(&p, p_end, false);
00102 lastdocid = decode_length(&p, p_end, false);
00103 if (p == p_end) {
00104 throw Xapian::NetworkError("Bad greeting message received (bool)", context);
00105 }
00106 has_positional_info = (*p++ == '1');
00107 avlength = unserialise_double(&p, p_end);
00108 if (p != p_end || avlength < 0) {
00109 throw Xapian::NetworkError("Bad greeting message received (double)", context);
00110 }
00111 }
00112
00113 RemoteDatabase *
00114 RemoteDatabase::as_remotedatabase()
00115 {
00116 return this;
00117 }
00118
00119 void
00120 RemoteDatabase::keep_alive()
00121 {
00122 send_message(MSG_KEEPALIVE, "");
00123 string message;
00124 get_message(message, REPLY_DONE);
00125 }
00126
00127 TermList *
00128 RemoteDatabase::open_term_list(Xapian::docid did) const
00129 {
00130 if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00131
00132
00133 if (!cached_stats_valid) update_stats();
00134
00135 send_message(MSG_TERMLIST, encode_length(did));
00136
00137 string message;
00138 get_message(message, REPLY_DOCLENGTH);
00139 const char * p = message.c_str();
00140 const char * p_end = p + message.size();
00141 Xapian::doclength doclen = unserialise_double(&p, p_end);
00142 if (p != p_end || doclen < 0) {
00143 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00144 }
00145
00146 AutoPtr<NetworkTermList> tlist;
00147 tlist = new NetworkTermList(doclen, doccount,
00148 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00149 did);
00150 vector<NetworkTermListItem> & items = tlist->items;
00151
00152 char type;
00153 while ((type = get_message(message)) == REPLY_TERMLIST) {
00154 NetworkTermListItem item;
00155 p = message.data();
00156 p_end = p + message.size();
00157 item.wdf = decode_length(&p, p_end, false);
00158 item.termfreq = decode_length(&p, p_end, false);
00159 item.tname.assign(p, p_end);
00160 items.push_back(item);
00161 }
00162 if (type != REPLY_DONE) {
00163 throw Xapian::NetworkError("Bad message received", context);
00164 }
00165
00166 tlist->current_position = tlist->items.begin();
00167 return tlist.release();
00168 }
00169
00170 TermList *
00171 RemoteDatabase::open_allterms(const string & prefix) const {
00172
00173 if (!cached_stats_valid) update_stats();
00174
00175 send_message(MSG_ALLTERMS, prefix);
00176
00177 AutoPtr<NetworkTermList> tlist;
00178 tlist = new NetworkTermList(0.0, doccount,
00179 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00180 0);
00181 vector<NetworkTermListItem> & items = tlist->items;
00182
00183 string message;
00184 char type;
00185 while ((type = get_message(message)) == REPLY_ALLTERMS) {
00186 NetworkTermListItem item;
00187 const char * p = message.data();
00188 const char * p_end = p + message.size();
00189 item.termfreq = decode_length(&p, p_end, false);
00190 item.tname.assign(p, p_end);
00191 items.push_back(item);
00192 }
00193 if (type != REPLY_DONE) {
00194 throw Xapian::NetworkError("Bad message received", context);
00195 }
00196
00197 tlist->current_position = tlist->items.begin();
00198 return tlist.release();
00199 }
00200
00201 LeafPostList *
00202 RemoteDatabase::open_post_list(const string &term) const
00203 {
00204 return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
00205 }
00206
00207 Xapian::doccount
00208 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
00209 {
00210 send_message(MSG_POSTLIST, term);
00211
00212 string message;
00213 char type;
00214 get_message(message, REPLY_POSTLISTSTART);
00215
00216 const char * p = message.data();
00217 const char * p_end = p + message.size();
00218 Xapian::doccount termfreq = decode_length(&p, p_end, false);
00219
00220 while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
00221 pl.append_posting(message);
00222 }
00223 if (type != REPLY_DONE) {
00224 throw Xapian::NetworkError("Bad message received", context);
00225 }
00226
00227 return termfreq;
00228 }
00229
00230 PositionList *
00231 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
00232 {
00233 send_message(MSG_POSITIONLIST, encode_length(did) + term);
00234
00235 vector<Xapian::termpos> positions;
00236
00237 string message;
00238 char type;
00239 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00240 while ((type = get_message(message)) == REPLY_POSITIONLIST) {
00241 const char * p = message.data();
00242 const char * p_end = p + message.size();
00243 lastpos += decode_length(&p, p_end, false) + 1;
00244 positions.push_back(lastpos);
00245 }
00246 if (type != REPLY_DONE) {
00247 throw Xapian::NetworkError("Bad message received", context);
00248 }
00249
00250 return new InMemoryPositionList(positions);
00251 }
00252
00253 bool
00254 RemoteDatabase::has_positions() const
00255 {
00256 if (!cached_stats_valid) update_stats();
00257 return has_positional_info;
00258 }
00259
00260 void
00261 RemoteDatabase::reopen()
00262 {
00263 update_stats(MSG_REOPEN);
00264 }
00265
00266
00267
00268
00269
00270
00271
00272 Xapian::Document::Internal *
00273 RemoteDatabase::open_document(Xapian::docid did, bool ) const
00274 {
00275 if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00276
00277 send_message(MSG_DOCUMENT, encode_length(did));
00278 string doc_data;
00279 map<Xapian::valueno, string> values;
00280 get_message(doc_data, REPLY_DOCDATA);
00281
00282 reply_type type;
00283 string message;
00284 while ((type = get_message(message)) == REPLY_VALUE) {
00285 const char * p = message.data();
00286 const char * p_end = p + message.size();
00287 Xapian::valueno valueno = decode_length(&p, p_end, false);
00288 values.insert(make_pair(valueno, string(p, p_end)));
00289 }
00290 if (type != REPLY_DONE) {
00291 throw Xapian::NetworkError("Bad message received", context);
00292 }
00293
00294 return new NetworkDocument(this, did, doc_data, values);
00295 }
00296
00297 void
00298 RemoteDatabase::update_stats(message_type msg_code) const
00299 {
00300 send_message(msg_code, "");
00301 string message;
00302 get_message(message, REPLY_UPDATE);
00303 const char * p = message.c_str();
00304 const char * p_end = p + message.size();
00305 doccount = decode_length(&p, p_end, false);
00306 lastdocid = decode_length(&p, p_end, false);
00307 if (p == p_end) {
00308 throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00309 }
00310 has_positional_info = (*p++ == '1');
00311 avlength = unserialise_double(&p, p_end);
00312 if (p != p_end || avlength < 0) {
00313 throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00314 }
00315 cached_stats_valid = true;
00316 }
00317
00318 Xapian::doccount
00319 RemoteDatabase::get_doccount() const
00320 {
00321 if (!cached_stats_valid) update_stats();
00322 return doccount;
00323 }
00324
00325 Xapian::docid
00326 RemoteDatabase::get_lastdocid() const
00327 {
00328 if (!cached_stats_valid) update_stats();
00329 return lastdocid;
00330 }
00331
00332 Xapian::doclength
00333 RemoteDatabase::get_avlength() const
00334 {
00335 if (!cached_stats_valid) update_stats();
00336 return avlength;
00337 }
00338
00339 bool
00340 RemoteDatabase::term_exists(const string & tname) const
00341 {
00342 Assert(!tname.empty());
00343 send_message(MSG_TERMEXISTS, tname);
00344 string message;
00345 reply_type type = get_message(message);
00346 if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
00347 throw Xapian::NetworkError("Bad message received", context);
00348 }
00349 return (type == REPLY_TERMEXISTS);
00350 }
00351
00352 Xapian::doccount
00353 RemoteDatabase::get_termfreq(const string & tname) const
00354 {
00355 Assert(!tname.empty());
00356 send_message(MSG_TERMFREQ, tname);
00357 string message;
00358 get_message(message, REPLY_TERMFREQ);
00359 const char * p = message.data();
00360 const char * p_end = p + message.size();
00361 return decode_length(&p, p_end, false);
00362 }
00363
00364 Xapian::termcount
00365 RemoteDatabase::get_collection_freq(const string & tname) const
00366 {
00367 Assert(!tname.empty());
00368 send_message(MSG_COLLFREQ, tname);
00369 string message;
00370 get_message(message, REPLY_COLLFREQ);
00371 const char * p = message.data();
00372 const char * p_end = p + message.size();
00373 return decode_length(&p, p_end, false);
00374 }
00375
00376 Xapian::doclength
00377 RemoteDatabase::get_doclength(Xapian::docid did) const
00378 {
00379 Assert(did != 0);
00380 send_message(MSG_DOCLENGTH, encode_length(did));
00381 string message;
00382 get_message(message, REPLY_DOCLENGTH);
00383 const char * p = message.c_str();
00384 const char * p_end = p + message.size();
00385 Xapian::doclength doclen = unserialise_double(&p, p_end);
00386 if (p != p_end || doclen < 0) {
00387 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00388 }
00389 return doclen;
00390 }
00391
00392 reply_type
00393 RemoteDatabase::get_message(string &result, reply_type required_type) const
00394 {
00395 OmTime end_time;
00396 if (timeout) end_time = OmTime::now() + timeout;
00397
00398 reply_type type = static_cast<reply_type>(link.get_message(result, end_time));
00399 if (type == REPLY_EXCEPTION) {
00400 unserialise_error(result, "REMOTE:", context);
00401 }
00402 if (required_type != REPLY_MAX && type != required_type) {
00403 string errmsg("Expecting reply type ");
00404 errmsg += om_tostring(required_type);
00405 errmsg += ", got ";
00406 errmsg += om_tostring(type);
00407 throw Xapian::NetworkError(errmsg);
00408 }
00409
00410 return type;
00411 }
00412
00413 void
00414 RemoteDatabase::send_message(message_type type, const string &message) const
00415 {
00416 OmTime end_time;
00417 if (timeout) end_time = OmTime::now() + timeout;
00418
00419 link.send_message(static_cast<unsigned char>(type), message, end_time);
00420 }
00421
00422 void
00423 RemoteDatabase::do_close()
00424 {
00425
00426
00427
00428 bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
00429
00430
00431 if (writable) dtor_called();
00432
00433
00434
00435
00436
00437 link.do_close(writable);
00438 }
00439
00440 void
00441 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
00442 Xapian::termcount qlen,
00443 Xapian::valueno collapse_key,
00444 Xapian::Enquire::docid_order order,
00445 Xapian::valueno sort_key,
00446 Xapian::Enquire::Internal::sort_setting sort_by,
00447 bool sort_value_forward,
00448 int percent_cutoff, Xapian::weight weight_cutoff,
00449 const Xapian::Weight *wtscheme,
00450 const Xapian::RSet &omrset)
00451 {
00452 string tmp = query->serialise();
00453 string message = encode_length(tmp.size());
00454 message += tmp;
00455
00456
00457 message += encode_length(qlen);
00458 message += encode_length(collapse_key);
00459 message += char('0' + order);
00460 message += encode_length(sort_key);
00461 message += char('0' + sort_by);
00462 message += char('0' + sort_value_forward);
00463 message += char(percent_cutoff);
00464 message += serialise_double(weight_cutoff);
00465
00466 tmp = wtscheme->name();
00467 message += encode_length(tmp.size());
00468 message += tmp;
00469
00470 tmp = wtscheme->serialise();
00471 message += encode_length(tmp.size());
00472 message += tmp;
00473
00474 message += serialise_rset(omrset);
00475
00476 send_message(MSG_QUERY, message);
00477 }
00478
00479 bool
00480 RemoteDatabase::get_remote_stats(bool nowait, Stats &out)
00481 {
00482 if (nowait && !link.ready_to_read()) return false;
00483
00484 string message;
00485 get_message(message, REPLY_STATS);
00486 out = unserialise_stats(message);
00487
00488 return true;
00489 }
00490
00491 void
00492 RemoteDatabase::send_global_stats(Xapian::doccount first,
00493 Xapian::doccount maxitems,
00494 Xapian::doccount check_at_least,
00495 const Stats &stats)
00496 {
00497 string message = encode_length(first);
00498 message += encode_length(maxitems);
00499 message += encode_length(check_at_least);
00500 message += serialise_stats(stats);
00501 send_message(MSG_GETMSET, message);
00502 }
00503
00504 void
00505 RemoteDatabase::get_mset(Xapian::MSet &mset)
00506 {
00507 string message;
00508 get_message(message, REPLY_RESULTS);
00509 mset = unserialise_mset(message);
00510 }
00511
00512 void
00513 RemoteDatabase::flush()
00514 {
00515 send_message(MSG_FLUSH, "");
00516
00517
00518 string message;
00519 get_message(message, REPLY_DONE);
00520 }
00521
00522 void
00523 RemoteDatabase::cancel()
00524 {
00525 cached_stats_valid = false;
00526
00527 send_message(MSG_CANCEL, "");
00528 }
00529
00530 Xapian::docid
00531 RemoteDatabase::add_document(const Xapian::Document & doc)
00532 {
00533 cached_stats_valid = false;
00534
00535 send_message(MSG_ADDDOCUMENT, serialise_document(doc));
00536
00537 string message;
00538 get_message(message, REPLY_ADDDOCUMENT);
00539
00540 const char * p = message.data();
00541 const char * p_end = p + message.size();
00542 return decode_length(&p, p_end, false);
00543 }
00544
00545 void
00546 RemoteDatabase::delete_document(Xapian::docid did)
00547 {
00548 cached_stats_valid = false;
00549
00550
00551 send_message(MSG_DELETEDOCUMENT, encode_length(did));
00552 string dummy;
00553 get_message(dummy, REPLY_DONE);
00554 }
00555
00556 void
00557 RemoteDatabase::delete_document(const std::string & unique_term)
00558 {
00559 cached_stats_valid = false;
00560
00561 send_message(MSG_DELETEDOCUMENTTERM, unique_term);
00562 }
00563
00564 void
00565 RemoteDatabase::replace_document(Xapian::docid did,
00566 const Xapian::Document & doc)
00567 {
00568 cached_stats_valid = false;
00569
00570 string message = encode_length(did);
00571 message += serialise_document(doc);
00572
00573 send_message(MSG_REPLACEDOCUMENT, message);
00574 }
00575
00576 Xapian::docid
00577 RemoteDatabase::replace_document(const std::string & unique_term,
00578 const Xapian::Document & doc)
00579 {
00580 cached_stats_valid = false;
00581
00582 string message = encode_length(unique_term.size());
00583 message += unique_term;
00584 message += serialise_document(doc);
00585
00586 send_message(MSG_REPLACEDOCUMENTTERM, message);
00587
00588 get_message(message, REPLY_ADDDOCUMENT);
00589
00590 const char * p = message.data();
00591 const char * p_end = p + message.size();
00592 return decode_length(&p, p_end, false);
00593 }