Main Page | Modules | Class List | Directories | File List | Class Members | File Members | Related Pages

buffering.h

00001 typedef struct dblib_buffer_row {
00003         TDSRESULTINFO *resinfo;
00005         unsigned char *row_data;
00007         DBINT row;
00009         TDS_INT *sizes;
00010 } DBLIB_BUFFER_ROW;
00011 
00012 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
00013 static int buffer_save_row(DBPROCESS *dbproc);
00014 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
00015 
00045 static int
00046 buffer_count(const DBPROC_ROWBUF *buf)
00047 {
00048         return (buf->head > buf->tail) ?
00049                 buf->head - buf->tail :                         /* |...TddddH....| */
00050                 buf->capacity - (buf->tail - buf->head);        /* |ddddH....Tddd| */
00051 }
00052  
00056 static int
00057 buffer_is_full(const DBPROC_ROWBUF *buf)
00058 {
00059         return buf->capacity == buffer_count(buf) && buf->capacity > 1;
00060 }
00061 
00062 #ifndef NDEBUG
00063 static int
00064 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
00065 {
00066         if (buf->tail <= buf->head)
00067                 if (buf->head <= idx && idx <= buf->tail)
00068                         return 1;
00069         
00070         if (0 <= idx && idx <= buf->head)
00071                 return 1;
00072         
00073         if (buf->tail <= idx && idx < buf->capacity)
00074                 return 1;
00075 #if 0   
00076         printf("buffer_index_valid: idx = %d\n", idx);
00077         buffer_struct_print(buf);
00078 #endif
00079         return 0;       
00080 }
00081 #endif
00082 
00083 static void
00084 buffer_free_row(DBLIB_BUFFER_ROW *row)
00085 {
00086         if (row->sizes)
00087                 TDS_ZERO_FREE(row->sizes);
00088         if (row->row_data) {
00089                 tds_free_row(row->resinfo, row->row_data);
00090                 row->row_data = NULL;
00091         }
00092         tds_free_results(row->resinfo);
00093         row->resinfo = NULL;
00094 }
00095  
00096 /*
00097  * Buffer is freed at slightly odd points, whenever
00098  * capacity changes: 
00099  * 
00100  * 1. When setting capacity, to release prior buffer.  
00101  * 2. By dbresults.  When called the second time, it has to 
00102  * release prior storage because the new resultset will have
00103  * a different width.  
00104  * 3. By dbclose(), else open/close/open would leak.  
00105  */
00106 static void
00107 buffer_free(DBPROC_ROWBUF *buf)
00108 {
00109         if (buf->rows != NULL) {
00110                 int i;
00111                 for (i = 0; i < buf->capacity; ++i)
00112                         buffer_free_row(&buf->rows[i]);
00113                 TDS_ZERO_FREE(buf->rows);
00114         }
00115 }
00116 
00117 /*
00118  * When no rows are currently buffered (and the buffer is allocated)
00119  * set the indices to their initial postions.
00120  */
00121 static void
00122 buffer_reset(DBPROC_ROWBUF *buf)
00123 {
00124         buf->head = 0;
00125         buf->current = buf->tail = buf->capacity;
00126 }
00127 
00128 static int
00129 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
00130 {
00131         if (++idx >= buf->capacity) { 
00132                 idx = 0;
00133         }
00134         return idx;
00135 }
00136 
00141 static DBLIB_BUFFER_ROW*
00142 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
00143 {
00144         if (!(idx >= 0 && idx < buf->capacity)) {
00145                 printf("idx is %d:\n", idx);
00146                 buffer_struct_print(buf);
00147                 assert(idx >= 0);
00148                 assert(idx < buf->capacity);
00149         }
00150         
00151         return &(buf->rows[idx]);
00152 }
00153 
00157 static DBINT
00158 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
00159 {
00160         return buffer_row_address(buf, idx)->row;
00161 }
00162 
00166 static int
00167 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
00168 {
00169         int i, ii, idx = -1;
00170         
00171         if (buf->tail == buf->capacity) {
00172                 assert (buf->head == 0);
00173                 return -1;      /* no rows buffered */
00174         }
00175         
00176         /* 
00177          * March through the buffers from tail to head, stop if we find our row.  
00178          * A full queue is indicated by tail == head (which means we can't write).
00179          */
00180         for (ii=0, i = buf->tail; i != buf->head || ii == 0; i = buffer_idx_increment(buf, i)) {
00181                 if( buffer_idx2row(buf, i) == row_number) {
00182                         idx = i;
00183                         break;
00184                 }
00185                 assert(ii++ < buf->capacity); /* prevent infinite loop */
00186         } 
00187         
00188         return idx;
00189 }
00190 
00196 static void
00197 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
00198 {
00199         int i;
00200 
00201         if (count < 0 || count > buffer_count(buf)) {
00202                 count = buffer_count(buf);
00203         }
00204 
00205         for (i=0; i < count; i++) {
00206                 if (buf->tail < buf->capacity)
00207                         buffer_free_row(&buf->rows[i]);
00208                 buf->tail = buffer_idx_increment(buf, buf->tail);
00209                 /* 
00210                  * If deleting rows from the buffer catches the tail to the head, 
00211                  * return to the initial postion.  Otherwise, it will look full.
00212                  */
00213                 if (buf->tail == buf->head) {
00214                         buffer_reset(buf);
00215                         break;
00216                 }
00217         }
00218 #if 0
00219         buffer_struct_print(buf);
00220 #endif
00221 }
00222 
00223 static void
00224 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
00225 {
00226         int i;
00227         int srctype, desttype;
00228         BYTE *src;
00229         const DBLIB_BUFFER_ROW *row;
00230 
00231         tdsdump_log(TDS_DBG_FUNC, "buffer_transfer_bound_data(%p %d %d %p %d)\n", buf, res_type, compute_id, dbproc, idx);
00232         assert(buffer_index_valid(buf, idx));
00233 
00234         row = buffer_row_address(buf, idx);
00235         assert(row->resinfo);
00236 
00237         for (i = 0; i < row->resinfo->num_cols; i++) {
00238                 DBINT srclen;
00239                 TDSCOLUMN *curcol = row->resinfo->columns[i];
00240                 
00241                 if (row->sizes)
00242                         curcol->column_cur_size = row->sizes[i];
00243 
00244                 if (curcol->column_nullbind) {
00245                         if (curcol->column_cur_size < 0) {
00246                                 *(DBINT *)(curcol->column_nullbind) = -1;
00247                         } else {
00248                                 *(DBINT *)(curcol->column_nullbind) = 0;
00249                         }
00250                 }
00251                 if (!curcol->column_varaddr)
00252                         continue;
00253 
00254                 if (row->row_data)
00255                         src = &row->row_data[curcol->column_data - row->resinfo->current_row];
00256                 else
00257                         src = curcol->column_data;
00258                 srclen = curcol->column_cur_size;
00259                 if (is_blob_type(curcol->column_type)) {
00260                         src = (BYTE *) ((TDSBLOB *) src)->textvalue;
00261                 }
00262                 desttype = _db_get_server_type(curcol->column_bindtype);
00263                 srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
00264 
00265                 if (srclen <= 0) {
00266                         if (srclen == 0 || !curcol->column_nullbind)
00267                                 dbgetnull(dbproc, curcol->column_bindtype, curcol->column_bindlen,
00268                                                 (BYTE *) curcol->column_varaddr);
00269                 } else {
00270                         copy_data_to_host_var(dbproc, srctype, src, srclen, desttype, 
00271                                                 (BYTE *) curcol->column_varaddr,  curcol->column_bindlen,
00272                                                          curcol->column_bindtype, curcol->column_nullbind);
00273                 }
00274         }
00275 
00276         /*
00277          * This function always bumps current.  Usually, it's called 
00278          * by dbnextrow(), so bumping current is a pretty obvious choice.  
00279          * It can also be called by dbgetrow(), but that function also 
00280          * causes the bump.  If you call dbgetrow() for row N, a subsequent
00281          * call to dbnextrow() yields N+1.  
00282          */
00283         buf->current = buffer_idx_increment(buf, buf->current);
00284 
00285 }       /* end buffer_transfer_bound_data()  */
00286 
00287 static void 
00288 buffer_struct_print(const DBPROC_ROWBUF *buf)
00289 {
00290         assert(buf);
00291 
00292         printf("\t%d rows in buffer\n",         buffer_count(buf));
00293         
00294         printf("\thead = %d\t",                 buf->head);
00295         printf("\ttail = %d\t",                 buf->tail);
00296         printf("\tcurrent = %d\n",              buf->current);
00297         printf("\tcapacity = %d\t",             buf->capacity);
00298         printf("\thead row number = %d\n",      buf->received);
00299 }
00300 
00301 /* * * Functions called only by public db-lib API take DBPROCESS* * */
00302 
00319 static int
00320 buffer_current_index(const DBPROCESS *dbproc)
00321 {
00322         const DBPROC_ROWBUF *buf = &dbproc->row_buf;
00323 #if 0
00324         buffer_struct_print(buf);
00325 #endif
00326         if (buf->capacity <= 1) /* no buffering */
00327                 return -1;
00328         if (buf->current == buf->head || buf->current == buf->capacity)
00329                 return -1;
00330                 
00331         assert(buf->current >= 0);
00332         assert(buf->current < buf->capacity);
00333         
00334         if( buf->tail < buf->head) {
00335                 assert(buf->tail < buf->current);
00336                 assert(buf->current < buf->head);
00337         } else {
00338                 if (buf->current > buf->head)
00339                         assert(buf->current > buf->tail);
00340         }
00341         return buf->current;
00342 }
00343 
00344 /*
00345  * Normally called by dbsetopt() to prepare for buffering
00346  * Called with nrows == 0 by dbopen to safely set buf->rows to NULL.  
00347  */
00348 static void
00349 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
00350 {
00351         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00352         
00353         buffer_free(buf);
00354 
00355         memset(buf, 0, sizeof(DBPROC_ROWBUF));
00356 
00357         if (0 == nrows) {
00358                 buf->capacity = 1;
00359                 return;
00360         }
00361 
00362         assert(0 < nrows);
00363 
00364         buf->capacity = nrows;
00365 }
00366 
00367 /*
00368  * Called only by dbresults(); capacity must be >= 1. 
00369  * Sybase's documents say dbresults() cannot return FAIL if the prior calls worked, 
00370  * which is a little strange, because (for FreeTDS, at least), dbresults
00371  * is when we learn about the result set's width.  Without that information, we
00372  * can't allocate memory for the buffer.  But if we *fail* to allocate memory, 
00373  * we're not to communicate it back to the caller?   
00374  */
00375 static void
00376 buffer_alloc(DBPROCESS *dbproc)
00377 {
00378         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00379         
00380         /* Call this function only after setting capacity. */
00381 
00382         assert(buf);
00383         assert(buf->capacity > 0);
00384         assert(buf->rows == NULL);
00385         
00386         buf->rows = (DBLIB_BUFFER_ROW *) calloc(buf->capacity, sizeof(DBLIB_BUFFER_ROW));
00387         
00388         assert(buf->rows);
00389         
00390         buffer_reset(buf);
00391         
00392         buf->received = 0;
00393 }
00394 
00399 static int
00400 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
00401 {
00402         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00403         DBLIB_BUFFER_ROW *row;
00404         int i;
00405 
00406         assert(buf->capacity >= 0);
00407 
00408         if (buffer_is_full(buf))
00409                 return -1;
00410 
00411         /* initial condition is head == 0 and tail == capacity */
00412         if (buf->tail == buf->capacity) {
00413                 /* bumping this tail will set it to zero */
00414                 assert(buf->head == 0);
00415                 buf->tail = buffer_idx_increment(buf, buf->tail);
00416         }
00417 
00418         row = buffer_row_address(buf, buf->head);
00419 
00420         /* bump the row number, write it, and move the data to head */
00421         if (row->resinfo) {
00422                 tds_free_row(row->resinfo, row->row_data);
00423                 tds_free_results(row->resinfo);
00424         }
00425         row->row = ++buf->received;
00426         ++resinfo->ref_count;
00427         row->resinfo = resinfo;
00428         row->row_data = NULL;
00429         if (row->sizes)
00430                 free(row->sizes);
00431         row->sizes = (TDS_INT *) calloc(resinfo->num_cols, sizeof(TDS_INT));
00432         for (i = 0; i < resinfo->num_cols; ++i)
00433                 row->sizes[i] = resinfo->columns[i]->column_cur_size;
00434 
00435         /* update current, bump the head */
00436         buf->current = buf->head;
00437         buf->head = buffer_idx_increment(buf, buf->head);
00438 
00439         return buf->current;
00440 }
00441 
00442 static int
00443 buffer_save_row(DBPROCESS *dbproc)
00444 {
00445         DBPROC_ROWBUF *buf = &dbproc->row_buf;
00446         DBLIB_BUFFER_ROW *row;
00447         int idx = buf->head - 1;
00448 
00449         if (buf->capacity <= 1)
00450                 return SUCCEED;
00451 
00452         if (idx < 0)
00453                 idx = buf->capacity - 1;
00454         if (idx >= 0 && idx < buf->capacity) {
00455                 row = &buf->rows[idx];
00456 
00457                 if (row->resinfo && !row->row_data) {
00458                         row->row_data = row->resinfo->current_row;
00459                         tds_alloc_row(row->resinfo);
00460                 }
00461         }
00462 
00463         return SUCCEED;
00464 }
00465 

Generated on Wed May 7 19:22:09 2008 for FreeTDS API by  doxygen 1.4.1