Getting started with Apache Pig

If, like me, you want to play around with data in a Hadoop cluster without having to write hundreds or thousands of lines of Java MapReduce code, you most likely will use either Hive (using the  Hive Query Language HQL) or Pig.

Hive is a SQL-like language which compiles to Java map-reduce code, while Pig is a data flow language which allows you to specify your map-reduce data pipelines using high level abstractions. 

The way I like to think of it is that writing Java MapReduce is like programming in assembler:  you need to manually construct every low level operation you want to perform.  Hive allows people familiar with SQL to extract data from Hadoop with ease and – like SQL – you specify the data you want without having to worry too much about the way in which it is retrieved.  Writing a Pig script is like writing a SQL execution plan:  you specify the exact sequence of operations you want to undertake when retrieving the data.  Pig also allows you to specify more complex data flows than is possible using HQL alone.

As a crusty old RDBMS guy, I at first thought that Hive and HQL was the most attractive solution and I still think Hive is critical to enterprise adoption of Hadoop since it opens up Hadoop to the world of enterprise Business Intelligence.  But Pig really appeals to me as someone who has spent so much time tuning SQL.  The Hive optimizer is currently at the level of early rule-based RDBMS optimizers from the early 90s.  It will get better and get better quickly, but given the massive size of most Hadoop clusters, the cost of a poorly optimized HQL statement is really high.  Explicitly specifying the execution plan in Pig arguably gives the programmer more control and lessens the likelihood of the “HQL statement from Hell” brining a cluster to it’s knees.

So I’ve started learning Pig, using the familiar (to me) Oracle sample schema which I downloaded using SQOOP.   (Hint:  Pig likes tab separated  files, so use the --fields-terminated-by '\t' flag in your SQOOP job). 

Here’s a diagram I created showing how some of the more familiar HQL idioms are implemented in Pig:

Note how using Pig we explicitly control the execution plan:  In HQL it’s up to the optimizer whether tables are joined before or after the “country_region=’Asia’” filter is applied.  In Pig I explicitly execute the filter before the join.    It turns out that the Hive optimizer does the same thing, but for complex data flows being able to explicitly control the sequence of events can be an advantage. 

Pig is only a little more wordy than HQL and while I definitely like the familiar syntax of HQL I really like the additional control of Pig.

Amazon Elastic Map Reduce (EMR), Hive, and TOAD

Since my first post on connecting to Amazon Elastic Map Reduce with TOAD, we’ve added quite a few features to our Hadoop support in general and our EMR support specifically, so I thought I’d summarize those features in this blog post

Amazon Elastic Map Reduce is a cloud-based version of Hadoop hosted on Amazon Elastic Compute Cloud (EC2) instance.  Using EMR, you can quickly establish a cloud based Hadoop cluster to perform map reduce work flows. 

EMR support Hive of course, and Toad for Cloud Databases (TCD)  includes Hive support, so let’s look at using that to query EMR data.

Using the Toad direct Hive client

 

TCD direct Hive connection support is the quickest way to establish a connection to Hive.  It uses a bundled JDBC driver to establish the connection.

Below we create a new connection to a Hive server running on EMR:

image

  1. Right click on Hive connections and choose “Connect to Hive” to create a new Hive connection.
  2. The host address is the “Master” EC2 instance for your EMR cluster.  You’ll find that on the EMR Job flow management page within your Amazon AWS console.  The Hive 0.5 server is running on port 10000 by default.
  3. Specifying a job tracker port allows us to track the execution of our Hive jobs in EMR.  The standard Hadoop jobtracker port is 50030, but in EMR it’s 9600.
  4. It’s possible to open up port 10000 so you can directly connect with Hive clients, but it’s a bad idea usually.  Hive has negligible built-in security, so you’d be exposing your Hive data.   For that reason we support a SSH mode in which you can tunnel through to your hadoop server using the keypair file that you used to start the EMR job flow.  The key name is also shown in the EMR console page, though obviously you’ll need to have an actual keypair file.

The direct Hive client allows you to execute any legal Hive QL commands.  In the example below, we create a new Hive table based on data held in an S3 bucket (The data is some UN data on homicide rates I uploaded).

SNAGHTML9c66e8d

Connecting Hive to the Toad data hub

 

It’s great to be able to use Hive to exploit Map Reduce using familiar (to me) SQL-like syntax.  But the real advantage of TCD for Hive is that we link to data that might be held in other sources – like Oracle, Cassandra, SQL Server, MongoDB, etc.

Setting up a hub connection to EMR hive is very similar to setting up a direct connection.  Of course you need a data hub installed (see here for instructions), then right click on the hub node and select “map data source”:

Now that the hub knows about the EMR hive connection, we can issue queries that access Hive and – in the same SQL – other datasources. For instance, here’s a query that joins homicide data in Hive Elastic Map Reduce with population data stored in a Oracle database (running as Amazonn RDS:  Relational Database Service).  We can do these cross platform joins across a lot of different types of database sources, including any ODBC compliant databases, any Apache Hbase or Hive connections, Cassandra, MongoDB, SimpleDB, Azure table services:

In the version that we are just about to release, queries can be saved as views or snapshots, allowing easier access from external tools of for users who aren’t familiar with SQL.   In the example above, I’m saving my query as a view.

 

Using other hub-enabled clients

 

TCD isn’t the only product that can issue hub queries.  In beta today, the Quest Business Intelligence Studio can attach to the data hub, and allows you to graphically explore you data using drag and drop, click and drilldown paradigms:

It’s great to be living in Australia – one of the lowest homicide rates!

If you’re a hard core data scientist, you can even attach R through to the hub via the RODBC interface.  So for instance, in the screen shot below, I’m using R to investigate the correlation  between population density and homicide rate.  The data comes from Hive (EMR) and Oracle (RDS),  is joined in the hub, saved as a snapshot and then feed into R for analysis.  Pretty cool for a crusty old stats guy like me (My very first computer program was written in 1979 on SPSS).

image

Comparing Hadoop Oracle loaders

Oracle put a lot of effort into highlighting the upcoming Oracle Hadoop Loader (OHL) at OOW 2011 – it was even highlighted in Andy Mendelsohn's keynote.  It’s great to see Oracle recognizing Hadoop as a top tier technology!

However, there were a few comments made about the “other loaders” that I wanted to clarify.  At Quest, I lead the team that writes the Quest Data Connector for Oracle and Hadoop (let’s call it the “Quest Connector”) which is a plug-in to the Apache Hadoop SQOOP framework and which provides optimized bidirectional data loads between Oracle and Hadoop.  Below I’ve outlined some of the high level features of the Quest Connector in the context of the  Oracle-Hadoop loaders.  Of course, I got my information on the Oracle loader from technical sessions at OOW so I may have misunderstood and/or the facts may change between now and the eventual release of that loader.  But I wanted to go on the record with the following:

  • All parties (Quest, Cloudera, Oracle) agree that native SQOOP (eg, without the Quest plug-in) will be sub-optimal: it will not exploit Oracle direct path reads or writes, will not use partitioning, nologging, etc.   Both Cloudera and Quest recommend that if are doing transfers between Oracle and Hadoop that you use SQOOP with the Quest connector.
  • The Quest connector is a free, open source plug in to SQOOP, which is itself a free, open source software product.  Both are licensed under the Apache 2.0 open source license.  Licensing for the Oracle Loader has not been announced, but Oracle has said it will be a commercial product and therefore presumably not free under all circumstances.   It’s definitely not open source.
  • The Quest loader is available now (version 1.4), the Oracle loader is in beta and will be released commercially in 2012.
  • The Oracle loader moves data from Hadoop to Oracle only.  The Quest loader can also move data from Oracle to Hadoop.   We import data into Hadoop from an Oracle database usually 5+ times faster than SQOOP alone.
  • Both Quest and the Oracle loader use direct path writes when loading from Hadoop to Oracle.  Oracle do say they use OCI calls which may be faster than the direct path SQL calls used by Quest in some circumstances.   But I’d suggest that the main optimization in each case is direct path.
  • Both Quest and the Oracle loader can do parallel direct path writes to a partitioned Oracle table.  In the case of the Quest loader, we create partitions based on the job and mapper ids.  Oracle can use logical keys and write into existing partitioned tables.  My understanding is that they will shuffle and sort the data in the mappers to direct the output to the appropriate partition in bulk.  They also do statistical sampling which may improve the load balancing when you are inserting into an existing table. 
  • The Quest loader can update existing tables, and can do Merge operations that insert or updates rows depending on the existence of a matching key value.  My understanding is that the Oracle loader will do inserts only - at least initially.
  • Both the Quest connector and the Oracle loader have some form of GUI.  The Oracle GUI I believe is in the commercial ODI product.  The Quest GUI is in the free Toad for Cloud Databases Eclipse plug-in.  I’ve put a screenshot of that at the end of the post.
  • The Quest connector uses the SQOOP framework which is a Apache Hadoop sub-project maintained by multiple companies most notably Cloudera.  This means that the Hadoop side of the product was written by people with a lot of experience in Hadoop.   Cloudera and Quest jointly support SQOOP when used with the Quest connector so you get the benefit of having very experienced Hadoop people involved as well as Quest people who know Oracle very well.   Obviously Oracle knows Oracle better than anyone, but people like me have been working with Oracle for decades and have credibility I think when it comes to Oracle performance optimization.

Again,  I’m happy to see Oracle embracing Hadoop;  I just wanted to set the record straight with regard to our technology which exists today as a free tool for optmized bi-directional data transfer between Oracle and Hadoop. 

You can download the Quest Connector at http://bit.ly/questHadoopConnector.  The documentation is at  http://bit.ly/QuestHadoopDoc.

15-09-2011 3-01-01 PM import

21-09-2011 9-21-41 AM Hadoop solutions

MongoDB, Oracle and Toad for Cloud Databases

We recently added support for MongoDB in Toad for Cloud Databases, so I took the opportunity of writing my first MongoDB Java program and taking the Toad functionality for a test drive.

MongoDB is a non-relational, document oriented database that is extremely popular with developers (see for instance this Hacker news poll).   Toad for cloud databases allows you to work with non-relational data using SQL by normalizing the data structures and converting SQL to the non-relational calls.

I wanted to get started by creating some MongoDB collections with familiar data.  So I wrote a Java program that takes data out of the Oracle sample schema, and loads it into Mongo as documents.  The program is here.

 

The key parts of the code are shown here:

   1: while (custRs.next()) { // For each customer
   2:     String custId = custRs.getString("CUST_ID");
   3:     String custFirstName = custRs.getString("CUST_FIRST_NAME");
   4:     String custLastName = custRs.getString("CUST_LAST_NAME");
   5:  
   6:     //Create the customer document 
   7:     BasicDBObject custDoc = new BasicDBObject();
   8:     custDoc.put("_id", custId);
   9:     custDoc.put("CustomerFirstName", custFirstName);
  10:     custDoc.put("CustomerLastName", custLastName);
  11:     // Create the product sales document 
  12:     BasicDBObject customerProducts = new BasicDBObject();
  13:     custSalesQry.setString(1, custId);
  14:     ResultSet prodRs = custSalesQry.executeQuery();
  15:     Integer prodCount = 0;
  16:     while (prodRs.next()) { //For each product sale 
  17:         String  timeId=prodRs.getString("TIME_ID"); 
  18:         Integer prodId = prodRs.getInt("PROD_ID");
  19:         String prodName = prodRs.getString("PROD_NAME");
  20:         Float Amount = prodRs.getFloat("AMOUNT_SOLD");
  21:         Float Quantity = prodRs.getFloat("QUANTITY_SOLD");
  22:         // Create the line item document 
  23:         BasicDBObject productItem = new BasicDBObject();            
  24:         productItem.put("prodId", prodId);
  25:         productItem.put("prodName", prodName);
  26:         productItem.put("Amount", Amount);
  27:         productItem.put("Quantity", Quantity);
  28:         // Put the line item in the salesforcustomer document 
  29:         customerProducts.put(timeId, productItem);
  30:         if (prodCount++ > 4) { // Just 5 for this demo
  31:             prodCount = 0;
  32:             break;
  33:         }
  34:     }
  35:     // put the salesforcustomer document in the customer document 
  36:     custDoc.put("SalesForCustomer", customerProducts);
  37:  
  38:     System.out.println(custDoc);
  39:     custColl.insert(custDoc);  //insert the customer 
  40:     custCount++;
  41:  
  42: }

Here’s how it works:

Lines Description
1-4 We loop through each customer,   retrieving the key customer details
7-10 We create a basic MongoDB document that contains the customer details
12 We create another MongoDB document that will contain all the product sales for the customer
16-21 Fetching the data for an individual sale for that customer from Oracle
23-27 We create a document for that single sale
29 Add the sale to the document containing all the sales
36 Add all the sales to the customer
39 Add the customer document to the collection

 

The MongoDB API is very straight forward; much easier than similar APIs for HBase or Cassandra.

When we run the program, we create JSON documents in Mongo DB that look like this:

   1: { "_id" : "7" , "CustomerFirstName" : "Linette" , "CustomerLastName" : "Ingram" , 
   2:     "SalesForCustomer" : {
   3:         "2001-05-30 00:00:00" : { "prodId" : 28 , "prodName" : "Unix/Windows 1-user pack" , "Amount" : 205.76 , "Quantity" : 1.0} , 
   4:         "1998-04-18 00:00:00" : { "prodId" : 129 , "prodName" : "Model NM500X High Yield Toner Cartridge" , "Amount" : 205.48 , "Quantity" : 1.0}
   5:     }
   6: }
   7: { "_id" : "8" , "CustomerFirstName" : "Vida" , "CustomerLastName" : "Puleo" , 
   8:     "SalesForCustomer" : { 
   9:         "1999-01-27 00:00:00" : { "prodId" : 18 , "prodName" : "Envoy Ambassador" , "Amount" : 1726.83 , "Quantity" : 1.0} , 
  10:         "1999-01-28 00:00:00" : { "prodId" : 18 , "prodName" : "Envoy Ambassador" , "Amount" : 1726.83 , "Quantity" : 1.0} , 
  11:         "1998-04-26 00:00:00" : { "prodId" : 20 , "prodName" : "Home Theatre Package with DVD-Audio/Video Play" , "Amount" : 608.39 , "Quantity" : 1.0} ,
  12:         "1998-01-19 00:00:00" : { "prodId" : 28 , "prodName" : "Unix/Windows 1-user pack" , "Amount" : 216.99 , "Quantity" : 1.0} , 
  13:         "1998-03-19 00:00:00" : { "prodId" : 28 , "prodName" : "Unix/Windows 1-user pack" , "Amount" : 216.99 , "Quantity" : 1.0} 
  14:     }
  15: }

 

Toad for Cloud “renormalizes” the documents so that they resemble something that we might use in a more conventional database.  So in this case,   Toad creates two tables from the Mongo collection, one for customers, and one which contains the sales for a customer.   You can rename the auto-generated foreign keys and the sub-table name to make this a bit clearer, as in the example below:

 

SNAGHTML1a75d15d

 

We can more clearly see the relationships in the .NET client by using Toad’s visual query builder (or we could have used the database diagram tool):

 

SNAGHTML1a7b4f62

 

MongoDB has a pretty rich query language, but it’s fairly mysterious to those of us are used to SQL, and it’s certainly not as rich as the SQL language.  Using Toad for Cloud, you can issue ANSI standard SQL against your MongoDB tables and quickly browse or perform complex queries.  Later this year,   this Mongo support will emerge in some of our commercial data management tools such as Toad for Data Analysts and our soon to be announced BI tools.

 

SNAGHTML1bfbfc5b

Using Toad with Hive in Amazon Elastic Map Reduce

The Toad for Cloud Databases eclipse client has support for Hive queries which makes it really easy for me to run queries against our test hadoop clusters.  It also supports Hive running on top of Amazon Elastic Map Reduce (EMR), but you do need to be aware that in EMR the default ports are different from what we have come to expect.

Firstly, if you have started an EMR cluster with Hive 0.5 support, then the Hive server will be running on port 10001, not port 10000.  The second difference is that the JobTracker is running on port 9100, rather than 50030.  So when attaching to EMR, you would set up your hive connection something like this:

1-02-2011 5-56-03 PM

Once you’ve done that, the Hive connection will show all the Hive tables and you can enter HQL queries in the SQL editor.  You can drag table and column names into the editor as well:

1-02-2011 5-57-29 PM

One of the simple, but really useful things about the hive client is that you can jump to the jobtracker web page while the HQL is running to see how it is going:

1-02-2011 5-59-19 PM

Here’s the resulting JobTracker console.  We can see the job running and – if we scroll to the right or maximize the window – we can see how the Map and reduce phases of the Hive job are progressing:

1-02-2011 8-11-07 PM

Working with Cassandra 0.7

In this post,  I experimented with inserting data from Oracle into Cassandra column families using Hector.  Unfortunately, that code isn’t compatible with the latest Cassandra 0.7 release, so I had to rework it.  The new version uses the addInsertion method of the Mutator object and while not totally intuitive didn’t take long to get working.  Here are the key changes:

   1: private static void insertSales(Connection oracleConn, Keyspace keyspace,
   2:         String cfName) throws SQLException {
   3:     int rows = 0;
   4:     ColumnPath cf = new ColumnPath(cfName);
   5:     Statement query = oracleConn.createStatement();
   6:  
   7:     String sqlText = "SELECT cust_id, cust_first_name,  cust_last_name, prod_name, "
   8:             + "           SUM (amount_sold) sum_amount_sold,sum(quantity_sold) sum_quantity_sold "
   9:             + "          FROM sh.sales    "
  10:             + "          JOIN sh.customers USING (cust_id) "
  11:             + "          JOIN sh.products  USING (prod_id)  "
  12:             + "         GROUP BY cust_id, cust_first_name,  cust_last_name,  prod_name "
  13:             + "         ORDER BY cust_id, prod_name ";
  14:     ResultSet results = query.executeQuery(sqlText);
  15:     int rowCount = 0;
  16:     int lastCustId = -1;
  17:     while (results.next()) { // For each customer
  18:         Integer custId = results.getInt("CUST_ID");
  19:         String keyValue = custId.toString();
  20:  
  21:         if (rowCount++ == 0 || custId != lastCustId) { // New Customer
  22:             String custFirstName = results.getString("CUST_FIRST_NAME");
  23:             String custLastName = results.getString("CUST_LAST_NAME");
  24:             System.out.printf("%s %s\n", custFirstName, custLastName);
  25:             // Create a supercolumn for customer details (first, lastname)
  26:             Mutator<String> mutator = HFactory.createMutator(keyspace,
  27:                     stringSerializer);
  28:             mutator.addInsertion(keyValue, cfName, HFactory
  29:                     .createSuperColumn("CustomerDetails", Arrays
  30:                             .asList(HFactory.createStringColumn(
  31:                                     "customerFirstName", custFirstName)),
  32:                             StringSerializer.get(), StringSerializer.get(),
  33:                             StringSerializer.get()));
  34:             mutator.addInsertion(keyValue, cfName, HFactory
  35:                     .createSuperColumn("CustomerDetails", Arrays
  36:                             .asList(HFactory.createStringColumn(
  37:                                     "customerLastName", custLastName)),
  38:                             StringSerializer.get(), StringSerializer.get(),
  39:                             StringSerializer.get()));
  40:  
  41:             mutator.execute();
  42:         }
  43:         // Insert product sales total for that customer
  44:         String prodName = results.getString("PROD_NAME");
  45:         Float SumAmountSold = results.getFloat("SUM_AMOUNT_SOLD");
  46:         Float SumQuantitySold = results.getFloat("SUM_QUANTITY_SOLD");
  47:         // Supercolumn name is the product name
  48:         Mutator<String> mutator = HFactory.createMutator(keyspace,
  49:                 stringSerializer);
  50:         mutator.addInsertion(keyValue, cfName, HFactory.createSuperColumn(
  51:                 prodName, Arrays.asList(HFactory.createStringColumn(
  52:                         "AmountSold", SumAmountSold.toString())),
  53:                 StringSerializer.get(), StringSerializer.get(),
  54:                 StringSerializer.get()));
  55:         mutator.addInsertion(keyValue, cfName, HFactory.createSuperColumn(
  56:                 prodName, Arrays.asList(HFactory.createStringColumn(
  57:                         "QuantitySold", SumQuantitySold.toString())),
  58:                 StringSerializer.get(), StringSerializer.get(),
  59:                 StringSerializer.get()));
  60:         mutator.execute(); 
  61:         lastCustId = custId;
  62:         rows++;
  63:     }
  64:     System.out.println(rows + " rows loaded into " + cf.getColumn_family());
  65: }

The reason why I wanted to do this was to play with Cassandra using our (relatively) new Toad for Cloud Databases Eclipse client.  Toad for Cloud Databases lets you work with non-relational datasources such as Cassandra, HBase, SimpleDB, etc, using SQL.   

Here’s how it works.  We select the column family we want to map from the Cassandra server:

14-01-2011 3-21-00 PM Map Cassandra1

That column family contains data loaded from both the Oracle CUSTOMER and SALES tables.   Toad recognizes that the data in that single column family is best represented by two normalized tables,  and gives us the opportunity to specify the names for the primary and foreign keys.  We can also rename the “tables” (more like views really) that Toad will create:

14-01-2011 3-22-34 PM map cassandra2

The resulting tables look similar to the tables that we originally loaded from Oracle, and we can issue SQL queries against them just as we could have with Oracle.  The queries get translated from SQL to thrift calls against the underlying Cassandra Server:

14-01-2011 4-16-50 PM cassandra query

I definitely find it easier to issue SQL than write a 200 line Java program to do the same thing!  Of course, I'm not much of a Java programmer, but at a minimum having Toad to query the Cassandra data is invaluable when checking to see that your program did was it was intended to do 

Oracle tables vs Cassandra SuperColumns

 

In my last post,  I wrote some Java code to insert Oracle tables into Cassandra column families.  As much fun as this was for me, it was  fairly trivial and not a particularly useful exercise in terms of learning Cassandra. 

In Cassandra,  data modelling is very different from the relational models we are used to and one would rarely convert a complete Oracle schema from tables directly to ColumnFamilies .  Instead, Cassandra data modelling involves the creation of ColumnFamilies with SuperColumns to represent master-detail structures that are commonly referenced together

SuperColumns vs Relational schema

 

At the Cassandra Summit in August,  Eben Hewitt gave a presentation on Cassandra Data Modelling.   There’s a lot of nuance in that talk and in the topic, but a key point in Cassandra – as in many other NoSQL databases – is that you model data to match the queries you need to satisfy,  rather than to a more theoretically "pure" normalized form.   For relational guys, the process is most similar to radical denormalization in which you introduce redundancy to allow for efficient query processing.

For example, let’s consider the Oracle SH sample schema.  Amongst other things, it includes SALES, PRODUCTS and CUSTOMERS:

 

9-09-2010 3-35-32 PM Oracle sample schema

We could map each Oracle table to a Cassandra ColumnFamily, but because there are no foreign key indexes or joins,  such a Cassandra data model would not necessarily support the types of queries we want.  For instance, if we want to query sales totals by customer ID, we should create a column family keyed by customer id, which contains SuperColumns named for each product which in turn includes columns for sales totals.  It might look something like this:

ID CustomerDetails Product Name #1 Product Name #2 ………….. Product Name #N
1
First Name Last Name
Guy Harrison
Quantity Value
3 $100,020
  …………..
Quantity Value
3 $130,000
2
First Name Last Name
Greg Cottman
 
Quantity Value
34 $10,080
…………..
Quantity Value
4 $99,000

 

Each customer “row” has super column for each product that contains the sales for that product.  Not all customers have all the supercolumns - each customer has supercolumns only for each product they have purchased.  The name of the SuperColumn is the name of the product.  

Giving the column the name of the product is a major departure from how we would do things in Oracle.  The name of a column or SuperColumn can be determined by the data, not by the schema - a concept completely alien to relational modelling.

Inserting into SuperColumns with Hector

 

To try and understand this,  I created a Cassandra columnfamily of the type “Super”.  Here’s my definition in the storage-conf.xml file:

<ColumnFamily Name="SalesByCustomer" 
ColumnType="Super"
CompareWith="UTF8Type"
CompareSubcolumnsWith="UTF8Type"
Comment="Sales summary for each customer "/>

And here is some of my Hector Java program, which reads sales totals for each customer from the Oracle sample schema, and inserts them into the ColumnFamily:

   1: private static void insertSales(Connection oracleConn, Keyspace keyspace,
   2:         String cfName) throws SQLException {
   3:     int rows = 0;
   4:     ColumnPath cf = new ColumnPath(cfName);
   5:     Statement query = oracleConn.createStatement();
   6:  
   7:     String sqlText = "SELECT cust_id, cust_first_name,  cust_last_name, prod_name, "
   8:             + "           SUM (amount_sold) sum_amount_sold,sum(quantity_sold) sum_quantity_sold "
   9:             + "          FROM sh.sales    "
  10:             + "          JOIN sh.customers USING (cust_id) "
  11:             + "          JOIN sh.products  USING (prod_id)  "
  12:             + "         GROUP BY cust_id, cust_first_name,  cust_last_name,  prod_name "
  13:             + "         ORDER BY cust_id, prod_name ";
  14:     ResultSet results = query.executeQuery(sqlText);
  15:     int rowCount = 0;
  16:     int lastCustId = -1;
  17:     while (results.next()) { // For each customer
  18:         Integer custId = results.getInt("CUST_ID");
  19:         String keyValue = custId.toString();
  20:  
  21:         if (rowCount++ == 0 || custId != lastCustId) { // New Customer
  22:             String custFirstName = results.getString("CUST_FIRST_NAME");
  23:             String custLastName = results.getString("CUST_LAST_NAME");
  24:             System.out.printf("%s %s\n", custFirstName, custLastName);
  25:             //Create a supercolumn for customer details (first, lastname)     
  26:             cf.setSuper_column(StringUtils.bytes("CustomerDetails"));
  27:             cf.setColumn(StringUtils.bytes("customerFirstName"));
  28:             keyspace.insert(keyValue, cf, StringUtils.bytes(custFirstName));
  29:             cf.setColumn(StringUtils.bytes("customerLastName"));
  30:             keyspace.insert(keyValue, cf, StringUtils.bytes(custLastName));
  31:         }
  32:         //Insert product sales total for that customer 
  33:         String prodName = results.getString("PROD_NAME");
  34:         Float SumAmountSold = results.getFloat("SUM_AMOUNT_SOLD");
  35:         Float SumQuantitySold = results.getFloat("SUM_QUANTITY_SOLD");
  36:         //Supercolumn name is the product name 
  37:         cf.setSuper_column(StringUtils.bytes(prodName));
  38:         cf.setColumn(StringUtils.bytes("AmountSold"));
  39:         keyspace.insert(keyValue, cf, StringUtils.bytes(SumAmountSold.toString()));
  40:         cf.setColumn(StringUtils.bytes("QuantitySold"));
  41:         keyspace.insert(keyValue, cf, StringUtils.bytes(SumQuantitySold.toString()));
  42:         
  43:         lastCustId = custId;
  44:         rows++;
  45:     }
  46:     System.out.println(rows + " rows loaded into " + cf.getColumn_family());
  47: }

This code is fairly straightforward,  but let’s step through it anyway:

Lines Description
7-14 Execute the Oracle SQL to get product summaries for each customer
17 Loop through each row returned (one row per product per customer)
21 Check to see if this is a completely new customer
26-30 If it is a new customer,  create the CustomerDetails SuperColumn for that customer.  The SuperColumn name is “CustomerDetails” and it contains columns for Firstname and Lastname.
37-41

Now we create a SuperColumn for a specfic product, still keyed to the customer.  The SuperColumn name is set to the name of the product (line 37).  Inside the supercolumn are placed columns “AmountSold” (lines 38-39) and “QuantitySold” (lines 40-41)



Querying SuperColumns

 

Inserting master detail relationships into a supercolumn column family was easy enough.  I had a lot more difficulty writing code to query the data.  The tricky part seems to be when you don’t know the name of the SuperColumn you want to read from.  There's no direct equivalent to the JDBC ResultMetaData object to query the SuperColumn names - instead you create a "SuperSlice" predictate that defines a range of SuperColumns that you want to retrieve.  It's a bit awkward to express the simple case in which you want to return all the SuperColumns. 

Below is a bit of code which retrieves sales totals for a specific customer id.  I suspect I've made a few newbie mistakes :-):

   1: public static void querySuperColumn(Keyspace keyspace, String cfName,
   2:         String keyValue) {
   3:  
   4:     ColumnPath colFamily = new ColumnPath(cfName);
   5:     System.out.println("Details for customer id " + keyValue);
   6:  
   7:     /* Get Customer Details */
   8:     colFamily.setSuper_column(StringUtils.bytes("CustomerDetails"));
   9:     SuperColumn custDetailsSc = keyspace
  10:             .getSuperColumn(keyValue, colFamily);
  11:     for (Column col : custDetailsSc.getColumns()) {
  12:         String colName = StringUtils.string(col.getName()); 
  13:         String colValue = StringUtils.string(col.getValue()); 
  14:         System.out.printf("\t%-20s:%-20s\n", colName, colValue);
  15:     }
  16:     /* Get dynamic columns -  */
  17:     ColumnParent colParent = new ColumnParent(cfName);
  18:     SliceRange sliceRange = new SliceRange(StringUtils.bytes(""), StringUtils
  19:             .bytes(""), false, 2 ^ 32); // TODO: what if there are > 2^32 ??                                             
  20:     SlicePredicate slicePredicate = new SlicePredicate();
  21:     slicePredicate.setSlice_range(sliceRange);
  22:     //TODO:  Surely there's an easier way to select all SC than the above??
  23:     List superSlice = keyspace.getSuperSlice(keyValue,
  24:             colParent, slicePredicate);
  25:     for (SuperColumn prodSuperCol : superSlice) {  //For each super column
  26:         String superColName = StringUtils.string(prodSuperCol.getName());
  27:         if (!superColName.equals("CustomerDetails")) { // Already displayed
  28:                                                          
  29:             System.out.printf("\n%50s:", superColName); // product Name 
  30:             List columns1 = prodSuperCol.getColumns();
  31:             for (Column col : columns1) {               // product data 
  32:                 String colName = StringUtils.string(col.getName()); 
  33:                 String colValue = StringUtils.string(col.getValue()); 
  34:                 System.out.printf("\t%20s:%-20s", colName, colValue);
  35:  
  36:             }
  37:         }
  38:     }
  39:  
  40: }
Lines Description
8-9 Set the superColumn to the “CustomerDetails” supercolumn
11-14 Retrieve the column values (firstname, surname) for the CustomerDetails supercolumn
17-21 Set up a “SlicePredicate” that defines the supercolumns to be queried.  I want to get all of the supercolumns (eg every product), so I set up an unbounded range (line 18) and supply that to the slice predicate (line 21)
23 Create a list of supercolumns.  This will include all the SuperColumns in the column family (including, unfortunately,  CustomerDetails)
27 Eliminate CustomerDetails from the result.  Here we only want product names
30-35 Iterate through the columns in each supercolumn.  THis will extract QuantitySold and AmountSold for each Product name

 

Here’s some output from the Java program.  It prints out customer Details and product sales totals for customer# 10100:

Details for customer id 101000
customerFirstName :Aidan
customerLastName :Wilbur

CD-RW, High Speed Pack of 5: AmountSold:11.99 QuantitySold:1.0
Keyboard Wrist Rest: AmountSold:11.99 QuantitySold:1.0
Multimedia speakers- 3" cones: AmountSold:44.99 QuantitySold:1.0


SuperColumns with Toad for Cloud Databases 

 

Toad for cloud databases now has Cassandra support, which makes querying SuperColumns s a lot easier.  SuperColumns that have dynamic names but uniform internal column structure (as in my example above) are represented by Toad for Cloud Databases as a detail table.  To put it another way,  Toad for Cloud Databases re-normalizes the data - displaying it in the format that we would typically use in an RDBMS. 

So when we point Toad for Cloud databases at our SalesByCustomer column family, it maps the column family to two tables:  one for CustomerDetails and the other - which by default it will call SalesByCustomersuper_column” – for product sales totals.  We can rename the subtable and subtable key during the mapping phase to make it clearer that it represents product details.

9-09-2010 1-56-19 PM map cassandra super col

Now if we want to extract product details for a particular customer, we can do a SQL join.  Below we build the join in the query builder, but of course we could simply code the SQL by hand as we would for any NoSQL or SQL database supported by Toad for Cloud Databases:

9-09-2010 3-49-36 PM cassandra supercol qry

And just to close the loop, here we can see that the Toad for Cloud databases query returns the same data as the Hector query:

9-09-2010 3-50-48 PM cassabdra supercol results

 

Conclusion

 

All NoSQL databases require that we change the way we think about data modelling, and Cassandra is no exception.  SuperColumns are an incredibly powerful construct, but I can’t say that I found them intuitive or easy.  Hopefully APIs and tooling will evolve to make life easier for those of us coming from the relational world.

Playing with Cassandra and Oracle

Cassandra  is one of the hottest of the NoSQL databases.  From a production DBAs perspective it’s not hard to see why:  while some of the other NoSQLs offer more programming bells and whistles for the developer, Cassandra is built from the ground up for total and transparency redundancy and scalability, close to the heart of every DBA.

However,  Cassandra involves some complex data modelling concepts – mainly around the notorious SuperColumn concept, and I don’t think I’ll ever understand it fully until I’ve played directly with some data.  To that end, I thought I’d start by trying to model some familiar Oracle sample schemas in Cassandra.

Toad for Cloud Databases is releasing support for Cassandra early next month (eg September 2010), so I’ve been using that – as well as Java of course – to try to get some initial data loaded.

For other NoSQL databases,  Toad for Cloud lets us create NoSQL tables from relational tables with a couple of clicks.  Unfortunately, we can’t do that with Cassandra, since you can’t create a ColumnFamily on the fly.  So my first Cassandra tasks was to write a simple program to take an Oracle table (or query) and create a matching column family.

Getting started

Getting started with Cassandra was surprisingly easy.  I followed the instructions in http://schabby.de/cassandra-installation-configuration/ to install Cassandra on my laptop, and installed the hector Java interface from http://prettyprint.me/2010/02/23/hector-a-java-cassandra-client/.

Terminology in NoSQL can be confusing, with each NoSQL database using terms differently from each other, and all of them using terms differently from RDBMS.  In Cassandra:

  • A Keyspace is like a schema
  • ColumnFamily is roughly like a table

Things get very funky when SuperColumns are introduced, but lets skip that for now.

To create a ColumnFamily in Cassandra 0.6, we have to add its name to the storage-conf.xml file which is in the Conf directory and then restart Cassandra.  In 0.7 there’ll be a way to do this without restarting the server.

Here is where I created a keyspace called “Guy” and created some ColumnFamilies to play with:

   1: "Guy">
   2:   "G_Employees" CompareWith="UTF8Type"/>
   3:   "G_Employees2" CompareWith="UTF8Type"/>
   4:   "G_Employees3" CompareWith="UTF8Type"/>
   5:   org.apache.cassandra.locator.RackUnawareStrategy
   6:   1
   7:   org.apache.cassandra.locator.EndPointSnitch
   8: 

 

Loading data

 

I wrote some Java code that takes a SQL statement, and loads the result set directly into a column family.  Here’s the critical method (the complete java program with command line interface is here):

   1: private static void oracle2Cassandra(Connection oracleConn,
   2:         Keyspace keyspace, String cfName, String sqlText)
   3:         throws SQLException {
   4:     int rows = 0;
   5:     ColumnPath cf = new ColumnPath(cfName);
   6:     Statement oraQuery = oracleConn.createStatement();
   7:     ResultSet result = oraQuery.executeQuery(sqlText);
   8:     ResultSetMetaData rsmd = result.getMetaData();
   9:     while (result.next()) { // For each row in the output
  10:         // The first column in the result set must be the key value
  11:         String keyValue = result.getString(1);
  12:         // Iterate through the other columns in the result set
  13:         for (int colId = 2; colId <= rsmd.getColumnCount(); colId++) {
  14:             String columnName = rsmd.getColumnName(colId);
  15:             String columnValue = result.getString(colId);
  16:             if (!result.wasNull()) {
  17:             cf.setColumn(StringUtils.bytes(columnName));
  18:                 keyspace.insert(keyValue, cf, StringUtils
  19:                         .bytes(columnValue));
  20:             }
  21:         }
  22:         rows++;
  23:     }
  24:     System.out.println(rows + " rows loaded into " + cf.getColumn_family());
  25: }

The method take s a Oracle connection and a SQL statement, and pushes the data from that SQL into the Cassandra column family and keyspace specified.   The first column returned by the query is used on the key to the Cassandra data.

Lines 6-8 execute the statement and retrieve a ResultSet object – which contains the data – and a ResultSetMetaData object which contains the column names.  Lines 9-21 just iterate through the rows and columns and create entries in the Column Family that match.   We use the Hector setColumn methodto set the name of the column and the insert method to apply the column value.  Too easy!

Of course, I’d have no idea as to whether my job had worked if I didn’t have Toad for Cloud databases available.  Using TCD, I can map the Cassandra columnFamily to a TCD “table” and browse the table (eg Cassandra Column Family) to see the resulting data:

image

I can even use SQL to join the Cassandra data to the Oracle data to make absolutely certain that the data transfer went OK:

image

 

It’s surprisingly easy to get started with Cassandra.  Installation of a test cluster is a breeze, and the Hector Java API is straight forward.    Of course,  direct mapping of RDBMS tables to Cassandra ColumnFamilies doesn’t involve the complexities of advanced Cassandra data models using variable columns and SuperColumns.    Next, I’m going to try and map a more complex ColumnFamily which maps to multiple Oracle tables – hopefully won’t make my brain hurt too much!

Toad for Cloud Databases is introducing Cassandra support in the 1.1 release due out within the next two weeks.  Its a free download from toadforcloud.com

Consistency models in Non relational Databases

One of the most significant differences between the new generation of non-relational (AKA NoSQL) databases and the traditional RDBMS is the way in which consistency of data is handled.  In a traditional RDBMS, all users see a consistent view of the data.  Once a user commits a transaction, all subsequent queries will report that transaction and certainly no-one will see partial results of a transaction.

RDBMS transactions are typically described as “ACID” transactions.  That is, they are:

  • Atomic: The transaction is indivisible – either all the statements in the transaction are applied to the database, or none are.
  • Consistent: The database remains in a consistent state before and after transaction execution.
  • Isolated: While multiple transactions can be executed by one or more users simultaneously, one transaction should not see the effects of other concurrent transactions.
  • Durable: Once a transaction is saved to the database (an action referred to in database programming circles as a commit), its changes are expected to persist.

As databases become distributed across multiple hosts,  maintaining ACID consistency becomes increasingly difficult.  In a transaction that spans multiple independent databases, complex two-phase commit protocols must be employed.  In the case of a truly clustered distributed database even more complex protocols are required, since the state of data in memory and the state of data in various transaction logs and data files must be maintained in a consistent state (cache fusion in Oracle RAC for instance).

CAP Theorem:  You can’t have it all

 

In 2000,  Eric Brewer outlined the CAP (AKA Brewer’s) Theorem.   Simplistically,  CAP theorem says that in a distributed database system, you can only have at most two of the following three characteristics:

  • Consistency: All nodes in the cluster see exactly the same data at any point in time
  • Availability: Failure of a node does not render the database inoperative
  • Partition tolerance:  Nodes can still function when communication with other groups of nodes is lost

Text Box-Quest Blue

Interpretation and implementations of CAP theorem vary,  but most of the NoSQL database system architectures favour partition tolerance and availability over strong consistency.

Eventual Consistency


A compromise between eventual consistency and weak (no guarantees) consistency is Eventual Consistency.

The core of the eventual consistency concept is that although the database may have some inconsistencies at a point in time, it will eventually become consistent should all updates cease.  That is,  inconsistencies are transitory:  eventually all nodes will receive the latest consistent updates.

BASE – Basically Available Soft-state Eventually consistent is an acronym used to contrast this approach with the RDBMS ACID transactions described above.

Not all implementations of eventually consistent are equal.     In particular, an eventually consistent database may also elect to provide the following:

  • Causal consistency:  This involves a signal being sent from between application sessions indicating that a change has occurred.  From that point on the receiving session will always see the updated value.
  • Read your own writes:  In this mode of consistency, a session that performs a change to the database will immediately see that change, even if other sessions experience a delay.
  • Monotonic consistency:  In this mode, A session will never see data revert to an earlier point in time.   Once we read a value, we will never see an earlier value.   

 

The NRW notation

NRW notation describes at a high level how a distributed database will trade off consistency, read performance and write performance.  NRW stands for:

  • N: the number of copies of each data item that the database will maintain. 
  • R: the number of copies that the application will access when reading the data item 
  • W: the number of copies of the data item that must be written before the write can complete.  

When N=W then the database will always write every copy before returning control to the client – this is more or less what traditional databases do when implementing synchronous replication.   If you are more concerned about write performance than read performance, then you can set W=1, R=N.  Then each read must access all copies to determine which is correct, but each write only has to touch a single copy of the data.

Most NoSQL databases use N>W>1:  more than one write must complete, but not all nodes need to be updated immediately.   You can increase the level of consistency in roughly three stages:

  1. If R=1, then the database will accept whatever value it reads first.  This might be out of date if not all updates have propagated through the system.   
  2. If R>1 then the database will more than one value and pick either the most recent (or “correct”) value.
  3. If W+R>N, then a read will always retrieve the latest value,  although it may be mixed in with “older” values.  In other words, the number of copies you write and the number of copies you read is high enough to guarantee that you’ll always have at least one copy of the latest version in your read set.   This is sometimes referred to as quorum assembly. 
     
NRW configuration Outcome
W=N  R=1 Read optimized strong consistency
W=1 R=N Write optimized strong consistency
W+R<=N Weak eventual consistency.  A read might not see latest update
W+R>N Strong consistency through quorum assembly.  A read will see at least one copy of the most recent update

 

NoSQL databases generally try hard to be as consistent as possible, even when configured for weaker consistency.  For instance, the read repair algorithm is often implemented to improve consistency when R=1.  Although the application does not wait for all the copies of a data item to be read,  the database will read all known copies in the background after responding to the initial request.  If the application asks for the data item again, it will therefore see the latest version. 

Vector clocks

NoSQL databases can seem simplistic in some respects, but there’s a lot of really clever algorithms going on behind the scenes.   For example,  the vector clock algorithm can be used to ensure that updates are processed in order (monotonic consistency).

With vector clocks,  each node participating in the cluster maintains an change number (or event count) similar to the System Change Number used in some RDBMSs.  The “vector” is a list including the current node's change number as well as the change numbers that have been received from other nodes.  When an update is transmitted, the vector is included with the update and the receiving node compares that vector with other vectors that have been received to determine if updates are being received out of sequence.    Out of sequence updates can be held until the preceding updates appear.

I found vector clocks hard to understand until I read the description in Operating Systems: Concurrent and Distributed Software Design  by Jean Bacon and Tim Harris (Addison-Wesley).

Amazon’s Dynamo

A lot of the eeventually consistent concepts were best articulated by Amazon in Verner Vogels’ Eventually Consistent paper and in Amazon’s paper on the Dynamo eeventually consistent key-value store.   Dynamo implements most of the ideas above, and is the inspiration for several well known NoSQL datastores including Voldemort and – together with Google’s BigTable specification – Cassandra.

 

 

 

 

Cloud (AKA NoSQL) Databases and me

I’ve been an RDBMS guy now since about 1988.    Around the time my first son Chris was born, the government department were I worked shifted from an ADABAS/MVS environment to an Oracle/VMS system, and I was dropped headfirst into joy of Oracle 5.1.  Since then I’ve been continuously involved with Oracle development, administration and performance as well in Sybase, MySQL and SQL Server.  So you could say I’m a pretty hard core old-school SQL database guy.

For the longest time, it looked like the RDBMS was here to stay.  But a few years ago,  we noticed signs of new models for databases that were better aligned with modern application architectures,  the massive “big data” demands of Web 2.0 and the increasing disparity between IO and CPU capabilities.   I’ve written and talked  about some of these trends over the past two years and I’ve placed some links to those articles the end of this post. 

What I haven’t been able to talk much about is what we are doing to support these Cloud/NoSQL database at Quest Software.   Quest is  diverse company, with strong offerings in Applications, Virtualization and Windows management,  but we are definitely very dominant in database tools, so you might have been wondering how we planned to move into the next generation of database management tools.  

For the past year or so,  I’ve been directing a team of developers who are building “Toad for Cloud Databases” which will provide data management capabilities for NoSQL/Cloud databases in a familiar TOAD platform.   This Toad will allow developers, DBA and Data analysts to work with data in these databases just as easily as you would with data in an RDBMS.  Even better, you’ll be able to move data between RDBMS and a NoSQL/Cloud database, and issue queries that join data in both databases. 

I can’t say much more about Toad for Cloud Databases in advance of it’s official release later this month.  But I will be starting to blog more frequently on NoSQL topics,  both in this blog and in the Toad for Cloud Databases community site that will be active my the end of June.

In this personal blog,  I’m going to publish some summaries of the things we’ve learned about the various NoSQL/Cloud Databases, especially from the perspective of an RDBMS professional.  I’ll also be posting early versions of articles and posting that will eventually go up on the official Toad for Cloud website.

I’m pretty excited about what we’re doing with Toad for Cloud Databases, and I’m looking forward to sharing some of this stuff with you.  Of course,  we’re still busily working on RDBMS products at Quest and particularly in Melbourne, where we develop SQL Navigator, Spotlight on Oracle, Spotlight on Oracle RAC,Spotlight on MySQL and Spotlight on SQL Server Enterprise.  

References

 

As promised, here are links to my previous articles on next generation databases.  Some are a bit dated now, but give you an idea of how the new world has emerged from my (SQL guy) perspective:

Is the next DBMS Revolution Looming? Database Trends and Applications, June 2008

End of the one-size fits all RDBMS? DBTA Database Trends and Applications, July 2008

Map Reduce for Business Intelligence and Analytics DBTA September 2009

What's next for RDBMS? Article published in InfoManagement Direct, May 7, 2009

Hadoop sets its sights on the Enterprise Jan 2010

http://www.slideshare.net/gharriso/next-generation-databases RMOUG presentation Jan 2010