Chapter 6. PRACTICAL EXAMPLES

Table of Contents

6.1. Using Synchronous Transactions
6.2. Handling Errors and Retrying Transactions
6.3. Basic Scanning Example
6.4. Using Secondary Indexes in Scans
6.5. NDB API Event Handling Example

Abstract

This section provides code examples illustrating how to accomplish some basic tasks using the NDB API.

All of these examples can be compiled and run as provided, and produce sample output to demonstrate their effects.

6.1. Using Synchronous Transactions

Abstract

This example illustrates the use of synchronous transactions in the NDB API.

The source code for this example can be found in storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index.cpp in the MySQL 5.1 tree.

The correct output from this program is as follows:

 ATTR1  ATTR2
 0      10
 1       1
 2      12
 Detected that deleted tuple doesn't exist!
 4      14
 5       5
 6      16
 7       7
 8      18
 9       9
/* 
 *  ndbapi_simple.cpp: Using synchronous transactions in the NDB API.
 */

#include <mysql.h>
#include <NdbApi.hpp>

// Used for cout
#include <stdio.h>
#include <iostream>

static void run_application(MYSQL &, Ndb_cluster_connection &);

#define PRINT_ERROR(code,msg) \
  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

int main()
{
  // ndb_init must be called first.
  ndb_init();

  //  Connect to the MySQL server and the cluster, 
  //  and run the application.
  {
    // Object representing the cluster.
    Ndb_cluster_connection cluster_connection;

    // Connect to the cluster management server (ndb_mgmd)
    if (cluster_connection.connect(
            4 /* retries               */,
            5 /* delay between retries */,
            1 /* verbose               */))
    {
      std::cout << "Cluster management server was not ready within 30 seconds.\n";
      exit(-1);
    }

    //  Connect and wait for the storage nodes (ndbd processes)
    if (cluster_connection.wait_until_ready(30,0) < 0)
    {
      std::cout << "Cluster was not ready within 30 secs.\n";
      exit(-1);
    }

    // Connect to the MySQL server.
    MYSQL mysql;
    if ( !mysql_init(&mysql) ) {
      std::cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
           3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);
    
    // Run the application code.
    run_application(mysql, cluster_connection);
  }

  ndb_end(0);

  std::cout << "\nTo drop created table use:\n"
      << "echo \"drop table MYTABLENAME\" | mysql TEST_DB_1 -u root\n";

  return 0;
}

static void create_table(MYSQL &);
static void do_insert(Ndb &);
static void do_update(Ndb &);
static void do_delete(Ndb &);
static void do_read(Ndb &);

static void run_application(MYSQL &mysql,
          Ndb_cluster_connection &cluster_connection)
{
  /********************************************
   * Connect to database via MySQL C API.     *
   ********************************************/
  mysql_query(&mysql, "CREATE DATABASE TEST_DB_1");
  if (mysql_query(&mysql, "USE TEST_DB_1") != 0) MYSQLERROR(mysql);
  create_table(mysql);

  /********************************************
   * Connect to database via NDB API          *
   ********************************************/
  // Object representing the database.
  Ndb myNdb( &cluster_connection, "TEST_DB_1" );
  if (myNdb.init()) APIERROR(myNdb.getNdbError());

  /*
   * Perform various operations on the database.
   */
  do_insert(myNdb);
  do_update(myNdb);
  do_delete(myNdb);
  do_read(myNdb);
}

/*********************************************************
 * Create a table named MYTABLENAME if it does not exist.*
 *********************************************************/
static void create_table(MYSQL &mysql)
{
  if (mysql_query(&mysql, 
      "CREATE TABLE"
      "  MYTABLENAME"
      "    (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,"
      "     ATTR2 INT UNSIGNED NOT NULL)"
      "  ENGINE=NDB"))
    MYSQLERROR(mysql);
}

/***********************************************
 * Using 5 transactions, insert 10 tuples into *
 * the table: (0,0),(1,1),...,(9,9)            *
 ***********************************************/
 
static void do_insert(Ndb &myNdb)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  for (int i = 0; i < 5; i++) {
    NdbTransaction *myTransaction= myNdb.startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
    
    NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myOperation->insertTuple();
    myOperation->equal("ATTR1", i);
    myOperation->setValue("ATTR2", i);

    myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

    myOperation->insertTuple();
    myOperation->equal("ATTR1", i+5);
    myOperation->setValue("ATTR2", i+5);
    
    if (myTransaction->execute( NdbTransaction::Commit ) == -1)
      APIERROR(myTransaction->getNdbError());
    
    myNdb.closeTransaction(myTransaction);
  }
}
 
/*****************************************************************
 * Update the second attribute in half of the tuples (adding 10).*
 *****************************************************************/
 
static void do_update(Ndb &myNdb)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  for (int i = 0; i < 10; i+=2) {
    NdbTransaction *myTransaction= myNdb.startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
    
    NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myOperation->updateTuple();
    myOperation->equal( "ATTR1", i );
    myOperation->setValue( "ATTR2", i+10);
    
    if( myTransaction->execute( NdbTransaction::Commit ) == -1 ) 
      APIERROR(myTransaction->getNdbError());
    
    myNdb.closeTransaction(myTransaction);
  }
}
  
/**********************************************************
 * Delete one tuple (the one whose primary key equals 3). *
 **********************************************************/
 
static void do_delete(Ndb &myNdb)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  NdbTransaction *myTransaction= myNdb.startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
  
  NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
  
  myOperation->deleteTuple();
  myOperation->equal( "ATTR1", 3 );
  
  if (myTransaction->execute(NdbTransaction::Commit) == -1) 
    APIERROR(myTransaction->getNdbError());
  
  myNdb.closeTransaction(myTransaction);
}

/******************************
 * Read and print all tuples. *
 ******************************/
static void do_read(Ndb &myNdb)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  std::cout << "ATTR1 ATTR2" << std::endl;
  
  for (int i = 0; i < 10; i++) {
    NdbTransaction *myTransaction= myNdb.startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
    
    NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myOperation->readTuple(NdbOperation::LM_Read);
    myOperation->equal("ATTR1", i);

    NdbRecAttr *myRecAttr= myOperation->getValue("ATTR2", NULL);
    if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
    
    if(myTransaction->execute( NdbTransaction::Commit ) == -1)
      if (i == 3) {
  std::cout << "Detected that deleted tuple doesn't exist!" << std::endl;
      } else {
  APIERROR(myTransaction->getNdbError());
      }
    
    if (i != 3) {
      printf(" %2d    %2d\n", i, myRecAttr->u_32_value());
    }
    myNdb.closeTransaction(myTransaction);
  }
}

6.2. Handling Errors and Retrying Transactions

Abstract

This program demonstrates handling errors and retrying failed transactions using the NDB API.

The source code for this example can be found in storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries.cpp in the MySQL 5.1 tree.

There are many ways to program using the NDB API. In this example, we perform two inserts in the same transaction using NdbConnection::execute(NoCommit).

In NDB API applications, there are two types of failures to be taken into account:

  1. Transaction failures: If non-permanent, these can be handled by re-executing the transaction.

  2. Application errors: These are indicated by APIERROR; they must be handled by the application programmer.

Important

Before executing this program, you should first run ndbapi_simple (see Section 6.1, “Using Synchronous Transactions”), in order to create the table MYTABLENAME.

/* 
 *  ndbapi_retries.cpp: Error handling and retrying transactions
 */

#include <NdbApi.hpp>

// Used for cout.
#include <iostream>  

// Used for sleep (use your own version of sleep).
#include <unistd.h>
#define TIME_TO_SLEEP_BETWEEN_TRANSACTION_RETRIES 1

//
//  APIERROR prints an NdbError object.
//

#define APIERROR(error) \
  { std::cout << "API ERROR: " << error.code << " " << error.message \
              << std::endl \
              << "           " << "Status: " << error.status \
              << ", Classification: " << error.classification << std::endl\
              << "           " << "File: " << __FILE__ \
              << " (Line: " << __LINE__ << ")" << std::endl \
              ; \
  }

//
//  TRANSERROR prints all error info regarding an NdbTransaction.
//

#define TRANSERROR(ndbTransaction) \
  { NdbError error = ndbTransaction->getNdbError(); \
    std::cout << "TRANS ERROR: " << error.code << " " << error.message \
              << std::endl \
              << "           " << "Status: " << error.status \
              << ", Classification: " << error.classification << std::endl \
              << "           " << "File: " << __FILE__ \
              << " (Line: " << __LINE__ << ")" << std::endl \
              ; \
    printTransactionError(ndbTransaction); \
  }

void printTransactionError(NdbTransaction *ndbTransaction) {
  const NdbOperation *ndbOp = NULL;
  int i=0;

  /****************************************************************
   * Print NdbError object for each operation in the transaction. *
   ****************************************************************/
  while ((ndbOp = ndbTransaction->getNextCompletedOperation(ndbOp)) != NULL) {
    NdbError error = ndbOp->getNdbError();
    std::cout << "           OPERATION " << i+1 << ": " 
        << error.code << " " << error.message << std::endl
        << "           Status: " << error.status 
        << ", Classification: " << error.classification << std::endl;
    i++;
  }
}

//  Example insert:
//  @param myNdb          Ndb object representing NDB Cluster
//  @param myTransaction  NdbTransaction used for transaction
//  @param myTable        Table to insert into
//  @param error          NdbError object returned in case of errors
//  @return -1 on failure, otherwise 0.
//
int insert(int transactionId, NdbTransaction* myTransaction,
     const NdbDictionary::Table *myTable) {
  NdbOperation   *myOperation;          // For other operations.

  myOperation = myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) return -1;
  
  if (myOperation->insertTuple() ||  
      myOperation->equal("ATTR1", transactionId) ||
      myOperation->setValue("ATTR2", transactionId)) {
    APIERROR(myOperation->getNdbError());
    exit(-1);
  }

  return myTransaction->execute(NdbTransaction::NoCommit);
}

//  Execute function which re-executes the transaction (tries 10 times) 
//  if there are temporary errors (for example, if the NDB Cluster is 
//  overloaded).
//  @return -1 on failure, 1 on success
//
int executeInsertTransaction(int transactionId, Ndb* myNdb,
           const NdbDictionary::Table *myTable) {
  int result = 0;                       // No result yet.
  int noOfRetriesLeft = 10;
  NdbTransaction   *myTransaction;      // For other transactions.
  NdbError ndberror;
  
  while (noOfRetriesLeft > 0 && !result) {
    
    /*************************************
     * Start and execute the transaction.*
     *************************************/
    myTransaction = myNdb->startTransaction();
    if (myTransaction == NULL) {
      APIERROR(myNdb->getNdbError());
      ndberror = myNdb->getNdbError();
      result = -1;  // Failure
    } else if (insert(transactionId, myTransaction, myTable) || 
         insert(10000+transactionId, myTransaction, myTable) ||
         myTransaction->execute(NdbTransaction::Commit)) {
      TRANSERROR(myTransaction);
      ndberror = myTransaction->getNdbError();
      result = -1;  // Failure
    } else {
      result = 1;   // Success
    }

    /**********************************
     * On failure: analyse the error. *
     **********************************/
    if (result == -1) {                 
      switch (ndberror.status) {
      case NdbError::Success:
  break;
      case NdbError::TemporaryError:
  std::cout << "Retrying transaction..." << std::endl;
  sleep(TIME_TO_SLEEP_BETWEEN_TRANSACTION_RETRIES);
  --noOfRetriesLeft;
  result = 0;   // No completed transaction yet
  break;
  
      case NdbError::UnknownResult:
      case NdbError::PermanentError:
  std::cout << "No retry of transaction..." << std::endl;
  result = -1;  // Permanent failure
  break;
      }
    }

    /**************************
     * Close the transaction. *
     **************************/
    if (myTransaction != NULL) {
      myNdb->closeTransaction(myTransaction);
    }
  }

  if (result != 1) exit(-1);
  return result;
}


int main()
{
  ndb_init();

  Ndb_cluster_connection *cluster_connection=
    new Ndb_cluster_connection(); // Object representing the cluster.

  int r= cluster_connection->connect(
                      5 /* retries               */,
                      3 /* delay between retries */,
                      1 /* verbose               */);
  if (r > 0)
  {
    std::cout
      << "Cluster connection failed, possibly resolved with more retries.\n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout
      << "Cluster connection failed.\n";
    exit(-1);
  }
             
  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 seconds." << std::endl;
    exit(-1);
  }

  Ndb* myNdb= new Ndb( cluster_connection,
           "TEST_DB_1" );  // Object representing the database.
  
  if (myNdb->init() == -1) {
    APIERROR(myNdb->getNdbError());
    exit(-1);
  }

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
  if (myTable == NULL)
  {
    APIERROR(myDict->getNdbError());
    return -1;
  }
  /*************************************
   * Execute some insert transactions. *
   *************************************/
  for (int i = 10000; i < 20000; i++) {
    executeInsertTransaction(i, myNdb, myTable);
  }
  
  delete myNdb;
  delete cluster_connection;

  ndb_end(0);
  return 0;
}

6.3. Basic Scanning Example

Abstract

This example illustrates how to use the NDB scanning API. It shows how to perform a scan, how to scan for an update, and how to scan for a delete, making use of the NdbScanFilter and NdbScanOperation classes.

(See Section 3.8, “The NdbScanFilter Class”, and Section 3.6.4, “The NdbScanOperation Class”.)

The source code for this example may found in MySQL 5.1 tree, in the file storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan.cpp.

This example makes use of the following classes and methods:

/*
 * ndbapi_scan.cpp: Use of the NDB scanning API
 */


#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>
// Used for cout
#include <iostream>
#include <stdio.h>

static void
milliSleep(int milliseconds){
  struct timeval sleeptime;
  sleeptime.tv_sec = milliseconds / 1000;
  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  select(0, 0, 0, 0, &sleeptime);
}


#define PRINT_ERROR(code,msg) \
  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

struct Car 
{
  Car() { memset(this, 0, sizeof(* this)); }
  
  unsigned int reg_no;
  char brand[20];
  char color[20];
};

int create_table(MYSQL &mysql) 
{
  while (mysql_query(&mysql, 
      "CREATE TABLE"
      "  GARAGE"
      "    (REG_NO INT UNSIGNED NOT NULL,"
      "     BRAND CHAR(20) NOT NULL,"
      "     COLOR CHAR(20) NOT NULL,"
      "     PRIMARY KEY USING HASH (REG_NO))"
      "  ENGINE=NDB"))
  {
    if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
      MYSQLERROR(mysql);
    std::cout << "MySQL Cluster already has the example table GARAGE. "
        << "Now dropping it..." << std::endl; 
    /***************
     * Drop table. *
     ***************/
    if (mysql_query(&mysql, "DROP TABLE GARAGE"))
      MYSQLERROR(mysql);
  }
  return 1;
}


int populate(Ndb * myNdb)
{
  int i;
  Car cars[15];

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  for (i = 0; i < 5; i++)
  {
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "Mercedes");
    sprintf(cars[i].color, "Blue");
  }

  for (i = 5; i < 10; i++)
  {
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "BMW");
    sprintf(cars[i].color, "Black");
  }

  for (i = 10; i < 15; i++)
  {
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "Toyota");
    sprintf(cars[i].color, "Pink");
  }
  
  NdbTransaction* myTrans = myNdb->startTransaction();
  if (myTrans == NULL)
    APIERROR(myNdb->getNdbError());

  for (i = 0; i < 15; i++) 
  {
    NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
    if (myNdbOperation == NULL) 
      APIERROR(myTrans->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", cars[i].reg_no);
    myNdbOperation->setValue("BRAND", cars[i].brand);
    myNdbOperation->setValue("COLOR", cars[i].color);
  }

  int check = myTrans->execute(NdbTransaction::Commit);

  myTrans->close();

  return check != -1;
}

int scan_delete(Ndb* myNdb, 
    int column,
    const char * color)
  
{
  
  // Perform an exclusive scan on all records, 
  // then delete them one by one.
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int deletedRows = 0;
  int check;
  NdbError              err;
  NdbTransaction  *myTrans;
  NdbScanOperation  *myScanOp;

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  while (true)
  {
    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt  << " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
  milliSleep(50);
  retryAttempt++;
  continue;
      }
      std::cout <<  err.message << std::endl;
      return -1;
    }

    myScanOp = myTrans->getNdbScanOperation(myTable); 
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 
    
    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
       filter.end() < 0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }    
    
    if(myTrans->execute(NdbTransaction::NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
  std::cout << myTrans->getNdbError().message << std::endl;
  myNdb->closeTransaction(myTrans);
  milliSleep(50);
  continue;
      }
      std::cout << err.code << std::endl;
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }


    while((check = myScanOp->nextResult(true)) == 0){
      do 
      {
  if (myScanOp->deleteCurrentTuple() != 0)
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return -1;
  }
  deletedRows++;
  
      } while((check = myScanOp->nextResult(false)) == 0);
      
      if(check != -1)
      {
  check = myTrans->execute(NdbTransaction::Commit);   
      }

      if(check == -1)
      {
  check = myTrans->restart();
      }

      err = myTrans->getNdbError();    
      if(check == -1)
      {
  if(err.status == NdbError::TemporaryError)
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    milliSleep(50);
    continue;
  } 
      }
    }
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;
  }
  
  if(myTrans!=0) 
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
  return -1;
}


int scan_update(Ndb* myNdb, 
    int update_column,
    const char * before_color,
    const char * after_color)
    
{
  
  //  Perform an exclusive scan on all records, 
  //  then update them one by one.
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int updatedRows = 0;
  int check;
  NdbError              err;
  NdbTransaction  *myTrans;
  NdbScanOperation  *myScanOp;

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  while (true)
  {

    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt 
    << " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
  milliSleep(50);
  retryAttempt++;
  continue;
      }
      std::cout <<  err.message << std::endl;
      return -1;
    }

    myScanOp = myTrans->getNdbScanOperation(myTable); 
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    if( myScanOp->readTuples(NdbOperation::LM_Exclusive) ) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 

    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
       filter.end() <0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }    
    
    if(myTrans->execute(NdbTransaction::NoCommit) != 0)
    {      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
  std::cout << myTrans->getNdbError().message << std::endl;
  myNdb->closeTransaction(myTrans);
  milliSleep(50);
  continue;
      }
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    while((check = myScanOp->nextResult(true)) == 0){
      do {
  NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
  if (myUpdateOp == 0)
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return -1;
  }
  updatedRows++;

  myUpdateOp->setValue(update_column, after_color);
  
      } while((check = myScanOp->nextResult(false)) == 0);
      
      if(check != -1)
      {
  check = myTrans->execute(NdbTransaction::NoCommit);   
      }

      err = myTrans->getNdbError();    
      if(check == -1)
      {
  if(err.status == NdbError::TemporaryError){
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    milliSleep(50);
    continue;
  } 
      }
    }

    if(myTrans->execute(NdbTransaction::Commit) == -1)
    {
      if(err.status == NdbError::TemporaryError){
  std::cout << myTrans->getNdbError().message << std::endl;
  myNdb->closeTransaction(myTrans);
  milliSleep(50);
  continue;
      } 
    }

    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;    
  }


  if(myTrans!=0) 
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
  return -1;
}



int scan_print(Ndb * myNdb)
{
  //  Perform an exclusive scan on all records, 
  //  then update them one by one.
  int                  retryAttempt = 0;
  const int            retryMax = 10;
  int fetchedRows = 0;
  int check;
  NdbError              err;
  NdbTransaction  *myTrans;
  NdbScanOperation  *myScanOp;
  /* The result from reading the attribute value 
     is made up of three columns:
     
     REG_NO, BRAND, and COLOR.
   */
  NdbRecAttr *      myRecAttr[3];   

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");

  if (myTable == NULL) 
    APIERROR(myDict->getNdbError());

  while (true)
  {

    if (retryAttempt >= retryMax)
    {
      std::cout << "ERROR: has retried this operation " << retryAttempt 
    << " times, failing!" << std::endl;
      return -1;
    }

    myTrans = myNdb->startTransaction();
    if (myTrans == NULL) 
    {
      const NdbError err = myNdb->getNdbError();

      if (err.status == NdbError::TemporaryError)
      {
  milliSleep(50);
  retryAttempt++;
  continue;
      }
     std::cout << err.message << std::endl;
      return -1;
    }
    /*
     * Define a scan operation. 
     * (NDBAPI.)
     */
    myScanOp = myTrans->getNdbScanOperation(myTable); 
    if (myScanOp == NULL) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }

    if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    } 

    myRecAttr[0] = myScanOp->getValue("REG_NO");
    myRecAttr[1] = myScanOp->getValue("BRAND");
    myRecAttr[2] = myScanOp->getValue("COLOR");
    if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) 
    {
  std::cout << myTrans->getNdbError().message << std::endl;
  myNdb->closeTransaction(myTrans);
  return -1;
    }
    
    if(myTrans->execute(NdbTransaction::NoCommit) != 0){      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
  std::cout << myTrans->getNdbError().message << std::endl;
  myNdb->closeTransaction(myTrans);
  milliSleep(50);
  continue;
      }
      std::cout << err.code << std::endl;
      std::cout << myTrans->getNdbError().code << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
    }
    
    while((check = myScanOp->nextResult(true)) == 0){
      do {
  
  fetchedRows++;
  std::cout << myRecAttr[0]->u_32_value() << "\t";

  std::cout << myRecAttr[1]->aRef() << "\t";

  std::cout << myRecAttr[2]->aRef() << std::endl;

      } while((check = myScanOp->nextResult(false)) == 0);

    }    
    myNdb->closeTransaction(myTrans);
    return 1;
  }
  return -1;

}


int main()
{
  ndb_init();
  MYSQL mysql;

  /**************************************************************
   * Connect to the MySQL server, and create the table.         *
   **************************************************************/
  {
    if ( !mysql_init(&mysql) ) {
      std::cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
           3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);

    mysql_query(&mysql, "CREATE DATABASE TEST_DB");
    if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);

    create_table(mysql);
  }

  /**************************************************************
   * Connect to NDB Cluster.                                    *
   **************************************************************/

  Ndb_cluster_connection cluster_connection;
  if (cluster_connection.connect(4, 5, 1))
  {
    std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
    exit(-1);
  }
  // Connect and wait for the storage nodes (ndbd processes).
  if (cluster_connection.wait_until_ready(30,0) < 0)
  {
    std::cout << "Cluster was not ready within 30 secs.\n";
    exit(-1);
  }

  Ndb myNdb(&cluster_connection,"TEST_DB");
  if (myNdb.init(1024) == -1) {      // Set max 1024  parallel transactions
    APIERROR(myNdb.getNdbError());
    exit(-1);
  }

  /*******************************************
   * Check table definition.                 *
   *******************************************/
  int column_color;
  {
    const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
    const NdbDictionary::Table *t= myDict->getTable("GARAGE");

    Car car;
    if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
  t->getColumn("BRAND")->getLength() != sizeof(car.brand))
    {
      std::cout << "Wrong table definition" << std::endl;
      exit(-1);
    }
    column_color= t->getColumn("COLOR")->getColumnNo();
  }

  if(populate(&myNdb) > 0)
    std::cout << "populate: Success!" << std::endl;
  
  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  std::cout << "Going to delete all pink cars!" << std::endl;
  
  {
    Car tmp;
    sprintf(tmp.color, "Pink");
    if(scan_delete(&myNdb, column_color, tmp.color) > 0)
      std::cout << "scan_delete: Success!" << std::endl  << std::endl;
  }

  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  {
    Car tmp1, tmp2;
    sprintf(tmp1.color, "Blue");
    sprintf(tmp2.color, "Black");
    std::cout << "Going to update all " << tmp1.color 
        << " cars to " << tmp2.color << " cars!" << std::endl;
    if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0) 
      std::cout << "scan_update: Success!" << std::endl  << std::endl;
  }
  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;

  return 0;
}

6.4. Using Secondary Indexes in Scans

Abstract

This program illustrates how to use secondary indexes in the NDB API.

The source code for this example may be found in the MySQL 5.1 source tree, in storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index.cpp.

The correct output from this program is shown here:

ATTR1 ATTR2
0      10
1       1
2      12
Detected that deleted tuple doesn't exist!
4      14
5       5
6      16
7       7
8      18
9       9
/*
 *  ndbapi_simple_index.cpp: Using secondary indexes in the NDB API.
 */

#include <mysql.h>
#include <NdbApi.hpp>

// Used for cout
#include <stdio.h>
#include <iostream>

#define PRINT_ERROR(code,msg) \
  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
            << ", code: " << code \
            << ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
  exit(-1); }
#define APIERROR(error) { \
  PRINT_ERROR(error.code,error.message); \
  exit(-1); }

int main()
{
  ndb_init();
  MYSQL mysql;

  /**************************************************************
   * Connect to the MySQL server and create the table           *
   **************************************************************/
  {
    if ( !mysql_init(&mysql) ) {
      std::cout << "mysql_init failed\n";
      exit(-1);
    }
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
           3306, "/tmp/mysql.sock", 0) )
      MYSQLERROR(mysql);

    mysql_query(&mysql, "CREATE DATABASE TEST_DB_1");
    if (mysql_query(&mysql, "USE TEST_DB_1") != 0) MYSQLERROR(mysql);

    if (mysql_query(&mysql, 
        "CREATE TABLE"
        "  MYTABLENAME"
        "    (ATTR1 INT UNSIGNED,"
        "     ATTR2 INT UNSIGNED NOT NULL,"
        "     PRIMARY KEY USING HASH (ATTR1),"
        "     UNIQUE MYINDEXNAME USING HASH (ATTR2))"
        "  ENGINE=NDB"))
      MYSQLERROR(mysql);
  }

  /**************************************************************
   * Connect to the NDB cluster.                                *
   **************************************************************/

  Ndb_cluster_connection *cluster_connection=
    new Ndb_cluster_connection(); // Object representing the cluster

  if (cluster_connection->connect(5,3,1))
  {
    std::cout << "Connect to cluster management server failed.\n";
    exit(-1);
  }

  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 seconds.\n";
    exit(-1);
  }

  Ndb* myNdb = new Ndb( cluster_connection,
      "TEST_DB_1" );  // Object representing the database.
  if (myNdb->init() == -1) { 
    APIERROR(myNdb->getNdbError());
    exit(-1);
  }

  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
  if (myTable == NULL)
    APIERROR(myDict->getNdbError());
  const NdbDictionary::Index *myIndex= myDict->getIndex("MYINDEXNAME","MYTABLENAME");
  if (myIndex == NULL)
    APIERROR(myDict->getNdbError());

/***********************************************
 * Using 5 transactions, insert 10 tuples into *
 * the table: (0,0),(1,1),...,(9,9)            *
 ***********************************************/
 
  for (int i = 0; i < 5; i++) {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
    
    NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myOperation->insertTuple();
    myOperation->equal("ATTR1", i);
    myOperation->setValue("ATTR2", i);

    myOperation = myTransaction->getNdbOperation(myTable);  
    if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

    myOperation->insertTuple();
    myOperation->equal("ATTR1", i+5);
    myOperation->setValue("ATTR2", i+5);
    
    if (myTransaction->execute( NdbTransaction::Commit ) == -1)
      APIERROR(myTransaction->getNdbError());
    
    myNdb->closeTransaction(myTransaction);
  }
  
  /**********************************************
   * Read and print all tuples using the index. *
   **********************************************/
  std::cout << "ATTR1 ATTR2" << std::endl;
  
  for (int i = 0; i < 10; i++) {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
    
    NdbIndexOperation *myIndexOperation=
      myTransaction->getNdbIndexOperation(myIndex);
    if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myIndexOperation->readTuple(NdbOperation::LM_Read);
    myIndexOperation->equal("ATTR2", i);
    
    NdbRecAttr *myRecAttr= myIndexOperation->getValue("ATTR1", NULL);
    if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());

    if(myTransaction->execute( NdbTransaction::Commit ) != -1)
      printf(" %2d    %2d\n", myRecAttr->u_32_value(), i);

    myNdb->closeTransaction(myTransaction);
  }

  /******************************************************************
   * Update the second attribute in half of the tuples (adding 10). *
   ******************************************************************/
   
  for (int i = 0; i < 10; i+=2) {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
    
    NdbIndexOperation *myIndexOperation=
      myTransaction->getNdbIndexOperation(myIndex);
    if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
    
    myIndexOperation->updateTuple();
    myIndexOperation->equal( "ATTR2", i );
    myIndexOperation->setValue( "ATTR2", i+10);
    
    if( myTransaction->execute( NdbTransaction::Commit ) == -1 ) 
      APIERROR(myTransaction->getNdbError());
    
    myNdb->closeTransaction(myTransaction);
  }
    
/**********************************************************
 * Delete one tuple (the one whose primary key equals 3). *
 **********************************************************/
 
  {
    NdbTransaction *myTransaction= myNdb->startTransaction();
    if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
  
    NdbIndexOperation *myIndexOperation=
      myTransaction->getNdbIndexOperation(myIndex);
    if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
  
    myIndexOperation->deleteTuple();
    myIndexOperation->equal( "ATTR2", 3 );
  
    if (myTransaction->execute(NdbTransaction::Commit) == -1) 
      APIERROR(myTransaction->getNdbError());
  
    myNdb->closeTransaction(myTransaction);
  }

  /**********************************
   * Read and print out all tuples. *
   **********************************/
  {
    std::cout << "ATTR1 ATTR2" << std::endl;
  
    for (int i = 0; i < 10; i++) {
      NdbTransaction *myTransaction= myNdb->startTransaction();
      if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
      
      NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
      if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
    
      myOperation->readTuple(NdbOperation::LM_Read);
      myOperation->equal("ATTR1", i);
    
      NdbRecAttr *myRecAttr= myOperation->getValue("ATTR2", NULL);
      if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
    
      if(myTransaction->execute( NdbTransaction::Commit ) == -1)
  if (i == 3) {
    std::cout << "Detected that deleted tuple doesn't exist!\n";
  } else {
    APIERROR(myTransaction->getNdbError());
  }
    
      if (i != 3) {
  printf(" %2d    %2d\n", i, myRecAttr->u_32_value());
      }
      myNdb->closeTransaction(myTransaction);
    }
  }

  /*******************
   * Drop the table. *
   *******************/
   
  if (mysql_query(&mysql, "DROP TABLE MYTABLENAME"))
    MYSQLERROR(mysql);

  delete myNdb;
  delete cluster_connection;

  ndb_end(0);
  return 0;
}

6.5. NDB API Event Handling Example

Abstract

This example demonstrates NDB API event handling.

The source code for this program may be found in the MySQL 5.1 source tree, in the file storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp.

/*
 *  ndbapi_event.cpp: Illustrates event handling in the NDB API.
 */
#include <NdbApi.hpp>

// Used for cout.
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#ifdef VM_TRACE
#include <my_global.h>
#endif
#ifndef assert
#include <assert.h>
#endif


#define APIERROR(error) \
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
              << error.code << ", msg: " << error.message << "." << std::endl; \
    exit(-1); }

int myCreateEvent(Ndb* myNdb,
      const char *eventName,
      const char *eventTableName,
      const char **eventColumnName,
      const int noEventColumnName,
                  bool merge_events);

int main(int argc, char** argv)
{
  ndb_init();
  bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0;
#ifdef VM_TRACE
  bool dbug = argc > 1 && strchr(argv[1], 'd') != 0;
  if (dbug) DBUG_PUSH("d:t:");
  if (dbug) putenv("API_SIGNAL_LOG=-");
#endif

  Ndb_cluster_connection *cluster_connection=
    new Ndb_cluster_connection(); // Object representing the cluster.

  int r= cluster_connection->connect(
             5 /* retries               */,
             3 /* delay between retries */,
             1 /* verbose               */);
  if (r > 0)
  {
    std::cout
      << "Cluster connect failed, possibly resolved with more retries.\n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout
      << "Cluster connect failed.\n";
    exit(-1);
  }
             
  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 seconds." << std::endl;
    exit(-1);
  }

  Ndb* myNdb= new Ndb(cluster_connection,
          "TEST_DB");  // Object representing the database.

  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());

  const char *eventName= "CHNG_IN_t0";
  const char *eventTableName= "t0";
  const int noEventColumnName= 5;
  const char *eventColumnName[noEventColumnName]=
    {"c0",
     "c1",
     "c2",
     "c3",
     "c4"
    };
  
  // Create events.
  myCreateEvent(myNdb,
    eventName,
    eventTableName,
    eventColumnName,
    noEventColumnName,
                merge_events);

  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i, j, k, l;
  j = 0;
  while (j < 99) {

    // Start "transaction" for handling events.
    NdbEventOperation* op;
    printf("Create EventOperation...\n");
    if ((op = myNdb->createEventOperation(eventName)) == NULL)
      APIERROR(myNdb->getNdbError());
    op->mergeEvents(merge_events);

    printf("Get values...\n");
    RA_BH recAttr[noEventColumnName];
    RA_BH recAttrPre[noEventColumnName];
    // Primary keys should always be a part of the result.
    for (i = 0; i < noEventColumnName; i++) {
      if (i < 4) {
        recAttr[i].ra    = op->getValue(eventColumnName[i]);
        recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
      } else if (merge_events) {
        recAttr[i].bh    = op->getBlobHandle(eventColumnName[i]);
        recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
      }
    }

    // Set up the callbacks.
    printf("Execute...\n");
    // This causes the changes to start "flowing".
    if (op->execute())
      APIERROR(op->getNdbError());

    NdbEventOperation* the_op = op;

    i= 0;
    while (i < 40) {
      // printf("Now waiting for event...\n");
      int r = myNdb->pollEvents(1000);  //  Wait for the event (maximum 
                                        //  1000 ms).
      if (r > 0) {
  // printf("Got data! %d\n", r);
  while ((op= myNdb->nextEvent())) {
          assert(the_op == op);
    i++;
    switch (op->getEventType()) {
    case NdbDictionary::Event::TE_INSERT:
      printf("%u INSERT", i);
      break;
    case NdbDictionary::Event::TE_DELETE:
      printf("%u DELETE", i);
      break;
    case NdbDictionary::Event::TE_UPDATE:
      printf("%u UPDATE", i);
      break;
    default:
      abort(); // This should not happen.
    }
          printf(" gci=%d\n", (int)op->getGCI());
          for (k = 0; k <= 1; k++) {
            printf(k == 0 ? "post: " : "pre : ");
            for (l = 0; l < noEventColumnName; l++) {
              if (l < 4) {
                NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
                if (ra->isNULL() >= 0) { // We have a value.
                  if (ra->isNULL() == 0) { // We have a non-null value.
                    if (l < 2)
                      printf("%-5u", ra->u_32_value());
                    else
                      printf("%-5.4s", ra->aRef());
                  } else
                    printf("%-5s", "NULL");
                } else
                  printf("%-5s", "-"); // no value
              } else if (merge_events) {
                int isNull;
                NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
                bh->getDefined(isNull);
                if (isNull >= 0) { // We have a value.
                  if (! isNull) { // We have a non-null value.
                    Uint64 length = 0;
                    bh->getLength(length);
                    // Read into buffer.
                    unsigned char* buf = new unsigned char [length];
                    memset(buf, 'X', length);
                    Uint32 n = length;
                    bh->readData(buf, n); // n is in/out.
                    assert(n == length);
                    // pretty-print
                    bool first = true;
                    Uint32 i = 0;
                    while (i < n) {
                      unsigned char c = buf[i++];
                      Uint32 m = 1;
                      while (i < n && buf[i] == c)
                        i++, m++;
                      if (! first)
                        printf("+");
                      printf("%u%c", m, c);
                      first = false;
                    }
                    printf("[%u]", n);
                    delete [] buf;
                  } else
                    printf("%-5s", "NULL");
                } else
                  printf("%-5s", "-"); // No value.
              }
            }
            printf("\n");
          }
  }
      } else
  ;  //  printf("Timed out.\n");
    }
    // We don't want to listen to events anymore...
    if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
    the_op = 0;

    j++;
  }

  {
    NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
    if (!myDict) APIERROR(myNdb->getNdbError());
    // Remove the event from the database.
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
  }

  delete myNdb;
  delete cluster_connection;
  ndb_end(0);
  return 0;
}

int myCreateEvent(Ndb* myNdb,
      const char *eventName,
      const char *eventTableName,
      const char **eventColumnNames,
      const int noEventColumnNames,
                  bool merge_events)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());

  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());

  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); 
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);

  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
  myEvent.mergeEvents(merge_events);

  // Add an event to the database.
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
     NdbError::SchemaObjectExists) {
    printf("Event creation failed; event already exists.\n");
    printf("Dropping event...\n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // Try again...
    // Add the event to the database.
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());

  return 0;
}