Working with Cassandra 0.7

In this post,  I experimented with inserting data from Oracle into Cassandra column families using Hector.  Unfortunately, that code isn’t compatible with the latest Cassandra 0.7 release, so I had to rework it.  The new version uses the addInsertion method of the Mutator object and while not totally intuitive didn’t take long to get working.  Here are the key changes:

   1: private static void insertSales(Connection oracleConn, Keyspace keyspace,
   2:         String cfName) throws SQLException {
   3:     int rows = 0;
   4:     ColumnPath cf = new ColumnPath(cfName);
   5:     Statement query = oracleConn.createStatement();
   6:  
   7:     String sqlText = "SELECT cust_id, cust_first_name,  cust_last_name, prod_name, "
   8:             + "           SUM (amount_sold) sum_amount_sold,sum(quantity_sold) sum_quantity_sold "
   9:             + "          FROM sh.sales    "
  10:             + "          JOIN sh.customers USING (cust_id) "
  11:             + "          JOIN sh.products  USING (prod_id)  "
  12:             + "         GROUP BY cust_id, cust_first_name,  cust_last_name,  prod_name "
  13:             + "         ORDER BY cust_id, prod_name ";
  14:     ResultSet results = query.executeQuery(sqlText);
  15:     int rowCount = 0;
  16:     int lastCustId = -1;
  17:     while (results.next()) { // For each customer
  18:         Integer custId = results.getInt("CUST_ID");
  19:         String keyValue = custId.toString();
  20:  
  21:         if (rowCount++ == 0 || custId != lastCustId) { // New Customer
  22:             String custFirstName = results.getString("CUST_FIRST_NAME");
  23:             String custLastName = results.getString("CUST_LAST_NAME");
  24:             System.out.printf("%s %s\n", custFirstName, custLastName);
  25:             // Create a supercolumn for customer details (first, lastname)
  26:             Mutator<String> mutator = HFactory.createMutator(keyspace,
  27:                     stringSerializer);
  28:             mutator.addInsertion(keyValue, cfName, HFactory
  29:                     .createSuperColumn("CustomerDetails", Arrays
  30:                             .asList(HFactory.createStringColumn(
  31:                                     "customerFirstName", custFirstName)),
  32:                             StringSerializer.get(), StringSerializer.get(),
  33:                             StringSerializer.get()));
  34:             mutator.addInsertion(keyValue, cfName, HFactory
  35:                     .createSuperColumn("CustomerDetails", Arrays
  36:                             .asList(HFactory.createStringColumn(
  37:                                     "customerLastName", custLastName)),
  38:                             StringSerializer.get(), StringSerializer.get(),
  39:                             StringSerializer.get()));
  40:  
  41:             mutator.execute();
  42:         }
  43:         // Insert product sales total for that customer
  44:         String prodName = results.getString("PROD_NAME");
  45:         Float SumAmountSold = results.getFloat("SUM_AMOUNT_SOLD");
  46:         Float SumQuantitySold = results.getFloat("SUM_QUANTITY_SOLD");
  47:         // Supercolumn name is the product name
  48:         Mutator<String> mutator = HFactory.createMutator(keyspace,
  49:                 stringSerializer);
  50:         mutator.addInsertion(keyValue, cfName, HFactory.createSuperColumn(
  51:                 prodName, Arrays.asList(HFactory.createStringColumn(
  52:                         "AmountSold", SumAmountSold.toString())),
  53:                 StringSerializer.get(), StringSerializer.get(),
  54:                 StringSerializer.get()));
  55:         mutator.addInsertion(keyValue, cfName, HFactory.createSuperColumn(
  56:                 prodName, Arrays.asList(HFactory.createStringColumn(
  57:                         "QuantitySold", SumQuantitySold.toString())),
  58:                 StringSerializer.get(), StringSerializer.get(),
  59:                 StringSerializer.get()));
  60:         mutator.execute(); 
  61:         lastCustId = custId;
  62:         rows++;
  63:     }
  64:     System.out.println(rows + " rows loaded into " + cf.getColumn_family());
  65: }

The reason why I wanted to do this was to play with Cassandra using our (relatively) new Toad for Cloud Databases Eclipse client.  Toad for Cloud Databases lets you work with non-relational datasources such as Cassandra, HBase, SimpleDB, etc, using SQL.   

Here’s how it works.  We select the column family we want to map from the Cassandra server:

14-01-2011 3-21-00 PM Map Cassandra1

That column family contains data loaded from both the Oracle CUSTOMER and SALES tables.   Toad recognizes that the data in that single column family is best represented by two normalized tables,  and gives us the opportunity to specify the names for the primary and foreign keys.  We can also rename the “tables” (more like views really) that Toad will create:

14-01-2011 3-22-34 PM map cassandra2

The resulting tables look similar to the tables that we originally loaded from Oracle, and we can issue SQL queries against them just as we could have with Oracle.  The queries get translated from SQL to thrift calls against the underlying Cassandra Server:

14-01-2011 4-16-50 PM cassandra query

I definitely find it easier to issue SQL than write a 200 line Java program to do the same thing!  Of course, I'm not much of a Java programmer, but at a minimum having Toad to query the Cassandra data is invaluable when checking to see that your program did was it was intended to do