/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* * ndbapi_scan.cpp: * Illustrates how to use the scan api in the NDBAPI. * The example shows how to do scan, scan for update and scan for delete * using NdbScanFilter and NdbScanOperation * * Classes and methods used in this example: * * Ndb_cluster_connection * connect() * wait_until_ready() * * Ndb * init() * getDictionary() * startTransaction() * closeTransaction() * * NdbTransaction * getNdbScanOperation() * execute() * * NdbScanOperation * getValue() * readTuples() * nextResult() * deleteCurrentTuple() * updateCurrentTuple() * * const NdbDictionary::Dictionary * getTable() * * const NdbDictionary::Table * getColumn() * * const NdbDictionary::Column * getLength() * * NdbOperation * insertTuple() * equal() * setValue() * * NdbScanFilter * begin() * eq() * end() * */ #include #include #include // Used for cout #include #include /** * Helper sleep function */ static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } /** * Helper sleep function */ #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } struct Car { /** * Note memset, so that entire char-fields are cleared * as all 20 bytes are significant (as type is char) */ Car() { memset(this, 0, sizeof(* this)); } unsigned int reg_no; char brand[20]; char color[20]; }; /** * Function to create table */ int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " GARAGE" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY USING HASH (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "MySQL Cluster already has example table: GARAGE. " << "Dropping it..." << std::endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE GARAGE")) MYSQLERROR(mysql); } return 1; } int populate(Ndb * myNdb) { int i; Car cars[15]; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Five blue mercedes */ for (i = 0; i < 5; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "Mercedes"); sprintf(cars[i].color, "Blue"); } /** * Five black bmw */ for (i = 5; i < 10; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "BMW"); sprintf(cars[i].color, "Black"); } /** * Five pink toyotas */ for (i = 10; i < 15; i++) { cars[i].reg_no = i; sprintf(cars[i].brand, "Toyota"); sprintf(cars[i].color, "Pink"); } NdbTransaction* myTrans = myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); for (i = 0; i < 15; i++) { NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); myNdbOperation->insertTuple(); myNdbOperation->equal("REG_NO", cars[i].reg_no); myNdbOperation->setValue("BRAND", cars[i].brand); myNdbOperation->setValue("COLOR", cars[i].color); } int check = myTrans->execute(NdbTransaction::Commit); myTrans->close(); return check != -1; } int scan_delete(Ndb* myNdb, int column, const char * color) { // Scan all records exclusive and delete // them one by one int retryAttempt = 0; const int retryMax = 10; int deletedRows = 0; int check; NdbError err; NdbTransaction *myTrans; NdbScanOperation *myScanOp; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Loop as long as : * retryMax not reached * failed operations due to TEMPORARY erros * * Exit loop; * retyrMax reached * Permanent error (return -1) */ while (true) { if (retryAttempt >= retryMax) { std::cout << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << std::endl; return -1; } myTrans = myNdb->startTransaction(); if (myTrans == NULL) { const NdbError err = myNdb->getNdbError(); if (err.status == NdbError::TemporaryError) { milliSleep(50); retryAttempt++; continue; } std::cout << err.message << std::endl; return -1; } /** * Get a scan operation. */ myScanOp = myTrans->getNdbScanOperation(myTable); if (myScanOp == NULL) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Define a result set for the scan. */ if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Use NdbScanFilter to define a search critera */ NdbScanFilter filter(myScanOp) ; if(filter.begin(NdbScanFilter::AND) < 0 || filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 || filter.end() < 0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Start scan (NoCommit since we are only reading at this stage); */ if(myTrans->execute(NdbTransaction::NoCommit) != 0){ err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } std::cout << err.code << std::endl; std::cout << myTrans->getNdbError().code << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ while((check = myScanOp->nextResult(true)) == 0){ do { if (myScanOp->deleteCurrentTuple() != 0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } deletedRows++; /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ } while((check = myScanOp->nextResult(false)) == 0); /** * Commit when all cached tuple have been marked for deletion */ if(check != -1) { check = myTrans->execute(NdbTransaction::Commit); } if(check == -1) { /** * Create a new transaction, while keeping scan open */ check = myTrans->restart(); } /** * Check for errors */ err = myTrans->getNdbError(); if(check == -1) { if(err.status == NdbError::TemporaryError) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } } /** * End of loop */ } std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return 0; } if(myTrans!=0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); } return -1; } int scan_update(Ndb* myNdb, int update_column, const char * before_color, const char * after_color) { // Scan all records exclusive and update // them one by one int retryAttempt = 0; const int retryMax = 10; int updatedRows = 0; int check; NdbError err; NdbTransaction *myTrans; NdbScanOperation *myScanOp; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Loop as long as : * retryMax not reached * failed operations due to TEMPORARY erros * * Exit loop; * retyrMax reached * Permanent error (return -1) */ while (true) { if (retryAttempt >= retryMax) { std::cout << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << std::endl; return -1; } myTrans = myNdb->startTransaction(); if (myTrans == NULL) { const NdbError err = myNdb->getNdbError(); if (err.status == NdbError::TemporaryError) { milliSleep(50); retryAttempt++; continue; } std::cout << err.message << std::endl; return -1; } /** * Get a scan operation. */ myScanOp = myTrans->getNdbScanOperation(myTable); if (myScanOp == NULL) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Define a result set for the scan. */ if( myScanOp->readTuples(NdbOperation::LM_Exclusive) ) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Use NdbScanFilter to define a search critera */ NdbScanFilter filter(myScanOp) ; if(filter.begin(NdbScanFilter::AND) < 0 || filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0|| filter.end() <0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Start scan (NoCommit since we are only reading at this stage); */ if(myTrans->execute(NdbTransaction::NoCommit) != 0) { err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } std::cout << myTrans->getNdbError().code << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ while((check = myScanOp->nextResult(true)) == 0){ do { /** * Get update operation */ NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple(); if (myUpdateOp == 0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } updatedRows++; /** * do the update */ myUpdateOp->setValue(update_column, after_color); /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ } while((check = myScanOp->nextResult(false)) == 0); /** * NoCommit when all cached tuple have been updated */ if(check != -1) { check = myTrans->execute(NdbTransaction::NoCommit); } /** * Check for errors */ err = myTrans->getNdbError(); if(check == -1) { if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } } /** * End of loop */ } /** * Commit all prepared operations */ if(myTrans->execute(NdbTransaction::Commit) == -1) { if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } } std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return 0; } if(myTrans!=0) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); } return -1; } int scan_print(Ndb * myNdb) { // Scan all records exclusive and update // them one by one int retryAttempt = 0; const int retryMax = 10; int fetchedRows = 0; int check; NdbError err; NdbTransaction *myTrans; NdbScanOperation *myScanOp; /* Result of reading attribute value, three columns: REG_NO, BRAND, and COLOR */ NdbRecAttr * myRecAttr[3]; const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); /** * Loop as long as : * retryMax not reached * failed operations due to TEMPORARY erros * * Exit loop; * retyrMax reached * Permanent error (return -1) */ while (true) { if (retryAttempt >= retryMax) { std::cout << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << std::endl; return -1; } myTrans = myNdb->startTransaction(); if (myTrans == NULL) { const NdbError err = myNdb->getNdbError(); if (err.status == NdbError::TemporaryError) { milliSleep(50); retryAttempt++; continue; } std::cout << err.message << std::endl; return -1; } /* * Define a scan operation. * NDBAPI. */ myScanOp = myTrans->getNdbScanOperation(myTable); if (myScanOp == NULL) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Read without locks, without being placed in lock queue */ if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Define storage for fetched attributes. * E.g., the resulting attributes of executing * myOp->getValue("REG_NO") is placed in myRecAttr[0]. * No data exists in myRecAttr until transaction has commited! */ myRecAttr[0] = myScanOp->getValue("REG_NO"); myRecAttr[1] = myScanOp->getValue("BRAND"); myRecAttr[2] = myScanOp->getValue("COLOR"); if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) { std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * Start scan (NoCommit since we are only reading at this stage); */ if(myTrans->execute(NdbTransaction::NoCommit) != 0){ err = myTrans->getNdbError(); if(err.status == NdbError::TemporaryError){ std::cout << myTrans->getNdbError().message << std::endl; myNdb->closeTransaction(myTrans); milliSleep(50); continue; } std::cout << err.code << std::endl; std::cout << myTrans->getNdbError().code << std::endl; myNdb->closeTransaction(myTrans); return -1; } /** * start of loop: nextResult(true) means that "parallelism" number of * rows are fetched from NDB and cached in NDBAPI */ while((check = myScanOp->nextResult(true)) == 0){ do { fetchedRows++; /** * print REG_NO unsigned int */ std::cout << myRecAttr[0]->u_32_value() << "\t"; /** * print BRAND character string */ std::cout << myRecAttr[1]->aRef() << "\t"; /** * print COLOR character string */ std::cout << myRecAttr[2]->aRef() << std::endl; /** * nextResult(false) means that the records * cached in the NDBAPI are modified before * fetching more rows from NDB. */ } while((check = myScanOp->nextResult(false)) == 0); } myNdb->closeTransaction(myTrans); return 1; } return -1; } int main() { ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE TEST_DB"); if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb myNdb(&cluster_connection,"TEST_DB"); if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb.getNdbError()); exit(-1); } /******************************************* * Check table definition * *******************************************/ int column_color; { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *t= myDict->getTable("GARAGE"); Car car; if (t->getColumn("COLOR")->getLength() != sizeof(car.color) || t->getColumn("BRAND")->getLength() != sizeof(car.brand)) { std::cout << "Wrong table definition" << std::endl; exit(-1); } column_color= t->getColumn("COLOR")->getColumnNo(); } if(populate(&myNdb) > 0) std::cout << "populate: Success!" << std::endl; if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; std::cout << "Going to delete all pink cars!" << std::endl; { /** * Note! color needs to be of exact the same size as column defined */ Car tmp; sprintf(tmp.color, "Pink"); if(scan_delete(&myNdb, column_color, tmp.color) > 0) std::cout << "scan_delete: Success!" << std::endl << std::endl; } if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; { /** * Note! color1 & 2 need to be of exact the same size as column defined */ Car tmp1, tmp2; sprintf(tmp1.color, "Blue"); sprintf(tmp2.color, "Black"); std::cout << "Going to update all " << tmp1.color << " cars to " << tmp2.color << " cars!" << std::endl; if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0) std::cout << "scan_update: Success!" << std::endl << std::endl; } if(scan_print(&myNdb) > 0) std::cout << "scan_print: Success!" << std::endl << std::endl; return 0; }