00001 package db.txn;
00002
00003 import com.sleepycat.bind.EntryBinding;
00004 import com.sleepycat.bind.serial.StoredClassCatalog;
00005 import com.sleepycat.bind.serial.SerialBinding;
00006 import com.sleepycat.bind.tuple.StringBinding;
00007
00008 import com.sleepycat.db.Cursor;
00009 import com.sleepycat.db.CursorConfig;
00010 import com.sleepycat.db.Database;
00011 import com.sleepycat.db.DatabaseEntry;
00012 import com.sleepycat.db.DatabaseException;
00013 import com.sleepycat.db.DeadlockException;
00014 import com.sleepycat.db.Environment;
00015 import com.sleepycat.db.LockMode;
00016 import com.sleepycat.db.OperationStatus;
00017 import com.sleepycat.db.Transaction;
00018
00019 import java.io.UnsupportedEncodingException;
00020 import java.util.Random;
00021
00022 public class DBWriter extends Thread
00023 {
00024 private Database myDb = null;
00025 private Environment myEnv = null;
00026 private EntryBinding dataBinding = null;
00027 private Random generator = new Random();
00028 private boolean passTxn = false;
00029
00030
00031 private static final int MAX_RETRY = 20;
00032
00033 private static String[] keys = {"key 1", "key 2", "key 3",
00034 "key 4", "key 5", "key 6",
00035 "key 7", "key 8", "key 9",
00036 "key 10"};
00037
00038
00039
00040
00041
00042 DBWriter(Environment env, Database db, StoredClassCatalog scc,
00043 boolean passtxn)
00044
00045 throws DatabaseException {
00046 myDb = db;
00047 myEnv = env;
00048 dataBinding = new SerialBinding(scc, PayloadData.class);
00049
00050 passTxn = passtxn;
00051 }
00052
00053
00054 DBWriter(Environment env, Database db, StoredClassCatalog scc)
00055
00056 throws DatabaseException {
00057 myDb = db;
00058 myEnv = env;
00059 dataBinding = new SerialBinding(scc, PayloadData.class);
00060 }
00061
00062
00063
00064
00065 public void run () {
00066 Transaction txn = null;
00067
00068
00069 for (int i=0; i<50; i++) {
00070
00071 boolean retry = true;
00072 int retry_count = 0;
00073
00074 while (retry) {
00075
00076
00077 try {
00078
00079
00080 txn = myEnv.beginTransaction(null, null);
00081
00082
00083
00084 for (int j = 0; j < 10; j++) {
00085
00086 DatabaseEntry key = new DatabaseEntry();
00087 StringBinding.stringToEntry(keys[j], key);
00088
00089
00090 PayloadData pd = new PayloadData(i+j, getName(),
00091 generator.nextDouble());
00092 DatabaseEntry data = new DatabaseEntry();
00093 dataBinding.objectToEntry(pd, data);
00094
00095
00096 myDb.put(txn, key, data);
00097 }
00098
00099
00100 System.out.println(getName() + " : committing txn : " + i);
00101
00102
00103
00104
00105
00106
00107
00108
00109 Transaction txnHandle = null;
00110 if (passTxn) { txnHandle = txn; }
00111
00112 System.out.println(getName() + " : Found " +
00113 countRecords(txnHandle) + " records in the database.");
00114 try {
00115 txn.commit();
00116 txn = null;
00117 } catch (DatabaseException e) {
00118 System.err.println("Error on txn commit: " +
00119 e.toString());
00120 }
00121 retry = false;
00122
00123 } catch (DeadlockException de) {
00124 System.out.println("################# " + getName() +
00125 " : caught deadlock");
00126
00127 if (retry_count < MAX_RETRY) {
00128 System.err.println(getName() +
00129 " : Retrying operation.");
00130 retry = true;
00131 retry_count++;
00132 } else {
00133 System.err.println(getName() +
00134 " : out of retries. Giving up.");
00135 retry = false;
00136 }
00137 } catch (DatabaseException e) {
00138
00139 retry = false;
00140 System.err.println(getName() +
00141 " : caught exception: " + e.toString());
00142 System.err.println(getName() +
00143 " : errno: " + e.getErrno());
00144 e.printStackTrace();
00145 } finally {
00146 if (txn != null) {
00147 try {
00148 txn.abort();
00149 } catch (Exception e) {
00150 System.err.println("Error aborting transaction: " +
00151 e.toString());
00152 e.printStackTrace();
00153 }
00154 }
00155 }
00156 }
00157 }
00158 }
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177 private int countRecords(Transaction txn) throws DatabaseException {
00178 DatabaseEntry key = new DatabaseEntry();
00179 DatabaseEntry data = new DatabaseEntry();
00180 int count = 0;
00181 Cursor cursor = null;
00182
00183 try {
00184
00185 CursorConfig cc = new CursorConfig();
00186
00187
00188
00189 cc.setReadUncommitted(true);
00190 cursor = myDb.openCursor(txn, cc);
00191 while (cursor.getNext(key, data, LockMode.DEFAULT) ==
00192 OperationStatus.SUCCESS) {
00193
00194 count++;
00195 }
00196 } finally {
00197 if (cursor != null) {
00198 cursor.close();
00199 }
00200 }
00201
00202 return count;
00203
00204 }
00205 }