00001 #include <sys/types.h>
00002
00003 #include <ctype.h>
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <string.h>
00007 #include <time.h>
00008 #include <unistd.h>
00009
00010 #include <atmi.h>
00011 #include <fml1632.h>
00012 #include <fml32.h>
00013 #include <tx.h>
00014 #include <xa.h>
00015
00016 #include <db.h>
00017
00018 #include "datafml.h"
00019 #include "hdbrec.h"
00020 #include "hcommonxa.h"
00021 #include "htimestampxa.h"
00022
00023
00024
00025
00026 #ifdef SERVER1
00027 #define TXN_FUNC TestTxn1
00028 #define TXN_STRING "TestTxn1"
00029 #endif
00030 #ifdef SERVER2
00031 #define TXN_FUNC TestTxn2
00032 #define TXN_STRING "TestTxn2"
00033 #endif
00034 void TXN_FUNC(TPSVCINFO *);
00035
00036 #define HOME "../data"
00037 #define TABLE1 "table1.db"
00038 #define TABLE2 "table2.db"
00039
00040 #ifdef VERBOSE
00041 static int verbose = 1;
00042 #else
00043 static int verbose = 0;
00044 #endif
00045
00046 DB *db1, *db2;
00047
00048 int cnt_forward;
00049 int cnt_request;
00050
00051 char *progname;
00052
00053 char *db_buf(DBT *);
00054
00055 int
00056 tpsvrinit(int argc, char* argv[])
00057 {
00058 DB_ENV *dbenv;
00059 int ret;
00060
00061 progname = argv[0];
00062 if (verbose)
00063 printf("%s: called\n", progname);
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073 if ((ret = db_env_create(&dbenv, 0)) != 0) {
00074 fprintf(stderr, "db_env_create: %s\n", db_strerror(ret));
00075 return (-1);
00076 }
00077 if ((ret = dbenv->open(dbenv, HOME, DB_JOINENV, 0)) != 0) {
00078 fprintf(stderr,
00079 "DB_ENV->open: %s: %s\n", HOME, db_strerror(ret));
00080 return (-1);
00081 }
00082 if ((ret = db_create(&db1, dbenv, 0)) != 0) {
00083 fprintf(stderr, "db_create: %s\n", db_strerror(ret));
00084 return (-1);
00085 }
00086 db1->set_errfile(db1, stderr);
00087 if ((ret = db1->open(db1, NULL,
00088 TABLE1, NULL, DB_BTREE, DB_AUTO_COMMIT | DB_CREATE, 0660)) != 0 ||
00089 (ret = db1->truncate(db1, NULL, NULL, 0)) != 0) {
00090 fprintf(stderr,
00091 "DB open/truncate: %s: %s\n", TABLE1, db_strerror(ret));
00092 return (-1);
00093 }
00094 if ((ret = db_create(&db2, dbenv, 0)) != 0) {
00095 fprintf(stderr, "db_create: %s\n", db_strerror(ret));
00096 return (-1);
00097 }
00098 db2->set_errfile(db2, stderr);
00099 if ((ret = db2->open(db2, NULL,
00100 TABLE2, NULL, DB_BTREE, DB_AUTO_COMMIT | DB_CREATE, 0660)) != 0 ||
00101 (ret = db2->truncate(db2, NULL, NULL, 0)) != 0) {
00102 fprintf(stderr,
00103 "DB open/truncate: %s: %s\n", TABLE2, db_strerror(ret));
00104 return (-1);
00105 }
00106
00107
00108 if ((ret = db1->close(db1, 0)) != 0) {
00109 fprintf(stderr,
00110 "DB->close: %s: %s\n", TABLE1, db_strerror(ret));
00111 return (-1);
00112 }
00113 if ((ret = db2->close(db2, 0)) != 0) {
00114 fprintf(stderr,
00115 "DB->close: %s: %s\n", TABLE2, db_strerror(ret));
00116 return (-1);
00117 }
00118 if ((ret = dbenv->close(dbenv, 0)) != 0) {
00119 fprintf(stderr,
00120 "DB_ENV->close: %s: %s\n", HOME, db_strerror(ret));
00121 return (-1);
00122 }
00123
00124
00125
00126 if (tx_open() == TX_ERROR) {
00127 fprintf(stderr, "tx_open: TX_ERROR\n");
00128 return (-1);
00129 }
00130
00131
00132 srand((u_int)(time(NULL) | getpid()));
00133
00134
00135 if ((ret = db_create(&db1, NULL, DB_XA_CREATE)) != 0) {
00136 fprintf(stderr, "db_create: %s\n", db_strerror(ret));
00137 return (-1);
00138 }
00139 db1->set_errfile(db1, stderr);
00140 if ((ret = db1->open(db1, NULL,
00141 TABLE1, NULL, DB_BTREE, DB_AUTO_COMMIT | DB_CREATE, 0660)) != 0) {
00142 fprintf(stderr, "DB open: %s: %s\n", TABLE1, db_strerror(ret));
00143 return (-1);
00144 }
00145 if ((ret = db_create(&db2, NULL, DB_XA_CREATE)) != 0) {
00146 fprintf(stderr, "db_create: %s\n", db_strerror(ret));
00147 return (-1);
00148 }
00149 db2->set_errfile(db2, stderr);
00150 if ((ret = db2->open(db2, NULL,
00151 TABLE2, NULL, DB_BTREE, DB_AUTO_COMMIT | DB_CREATE, 0660)) != 0) {
00152 fprintf(stderr, "DB open: %s: %s\n", TABLE2, db_strerror(ret));
00153 return (-1);
00154 }
00155
00156 if (verbose)
00157 printf("%s: tpsvrinit: initialization done\n", progname);
00158
00159 return (0);
00160 }
00161
00162 void
00163 tpsvrdone()
00164 {
00165 if (db1 != NULL)
00166 (void)db1->close(db1, 0);
00167 if (db2 != NULL)
00168 (void)db2->close(db2, 0);
00169
00170 tx_close();
00171
00172 if (verbose)
00173 printf("%s: tpsvrdone: shutdown done\n", progname);
00174
00175 printf("%s: %d requests, %d requests forwarded to the other server\n",
00176 progname, cnt_request, cnt_forward);
00177 }
00178
00179 void
00180 TXN_FUNC(TPSVCINFO *msg)
00181 {
00182 DBT data;
00183 DBT key;
00184 FBFR *replyBuf;
00185 HDbRec rcrd;
00186 long replyLen, seqNo;
00187 int ret;
00188
00189 ++cnt_request;
00190
00191 #ifdef SERVER1
00192
00193
00194
00195
00196 if (rand() % 2 > 0) {
00197 ++cnt_forward;
00198
00199 replyLen = 1024;
00200 if ((replyBuf =
00201 (FBFR*)tpalloc("FML32", NULL, replyLen)) == NULL ||
00202 tpcall("TestTxn2", msg->data,
00203 0, (char**)&replyBuf, &replyLen, TPSIGRSTRT) == -1) {
00204 fprintf(stderr, "%s: TUXEDO ERROR: %s (code %d)\n",
00205 progname, tpstrerror(tperrno), tperrno);
00206 tpfree((char*)replyBuf);
00207 tpreturn(TPFAIL, 0L, 0, 0L, 0);
00208 } else {
00209 tpfree((char*)replyBuf);
00210 tpreturn(TPSUCCESS, tpurcode, 0, 0L, 0);
00211 }
00212 return;
00213 }
00214 #endif
00215
00216 if (Fget((FBFR*)msg->data, SEQ_NO, 0, (char *)&rcrd.SeqNo, 0) == -1)
00217 goto fml_err;
00218 if (Fget((FBFR*)msg->data, TS_SEC, 0, (char *)&rcrd.Ts.Sec, 0) == -1)
00219 goto fml_err;
00220 if (Fget(
00221 (FBFR*)msg->data, TS_USEC, 0, (char *)&rcrd.Ts.Usec, 0) == -1) {
00222 fml_err: fprintf(stderr, "%s: FML ERROR: %s (code %d)\n",
00223 progname, Fstrerror(Ferror), Ferror);
00224 goto err;
00225 }
00226
00227 seqNo = rcrd.SeqNo;
00228 memset(&key, 0, sizeof(key));
00229 key.data = &seqNo;
00230 key.size = sizeof(seqNo);
00231 memset(&data, 0, sizeof(data));
00232 data.data = &rcrd;
00233 data.size = sizeof(rcrd);
00234
00235 strcpy(rcrd.Msg, "Table1");
00236 if (verbose) {
00237 printf("put1: key: %s\n", db_buf(&key));
00238 printf("put1: data: %s\n", db_buf(&data));
00239 }
00240 if ((ret = db1->put(db1, NULL, &key, &data, 0)) != 0) {
00241 fprintf(stderr, "%s: %s: Table1->put: %s\n",
00242 progname, TXN_STRING, db_strerror(ret));
00243 goto err;
00244 }
00245
00246 strcpy(rcrd.Msg, "Table2");
00247 if ((ret = db2->put(db2, NULL, &key, &data, 0)) != 0) {
00248 fprintf(stderr, "%s: %s: Table2->put: %s\n",
00249 progname, TXN_STRING, db_strerror(ret));
00250 goto err;
00251 }
00252
00253
00254
00255
00256
00257
00258
00259 if (rand() % 10 > 7) {
00260 if (verbose)
00261 printf("%s: %s: commit\n", progname, TXN_STRING);
00262 tpreturn(TPSUCCESS, 0L, 0, 0L, 0);
00263 } else {
00264 if (verbose)
00265 printf("%s: %s: abort\n", progname, TXN_STRING);
00266 tpreturn(TPSUCCESS, 1L, 0, 0L, 0);
00267 }
00268 return;
00269
00270 err: tpreturn(TPFAIL, 1L, 0, 0L, 0);
00271 }
00272
00273 char *
00274 db_buf(dbt)
00275 DBT *dbt;
00276 {
00277 static u_char buf[1024];
00278 size_t len;
00279 u_char *p, *b;
00280
00281 for (p = dbt->data, len = dbt->size, b = buf; len > 0; ++p, --len)
00282 if (isprint(*p))
00283 b += sprintf((char *)b, "%c", *p);
00284 else
00285 b += sprintf((char *)b, "%#o", *p);
00286 return ((char *)buf);
00287 }