Sunday, June 10, 2012

JDBC Storage Handler for Hive

I was able to complete the implementation of Hive JDBC Storage Handler with basic functionality. Therefore I thought to write a blog post describing the usage with some sample queries. Currently It supports writing into any database and reading from major databases (MySql, MsSql, Oracle, H2, PostgreSQL). This feature comes with WSO2 BAM 2.0.0 release. 

Setting up the BAM to use Hive jdbc-handler. 


Please add your jdbc-driver to $BAM_HOME/repository/component/lib directory, before starting the server. 

Web UI for executing Hive queries.

BAM2 comes with a web ui for executing the Hive queries. Also there is a option to schedule the script

 
User interface for writing Hive Queries


User interface for scheduling hive script

Sample on writing analyzed data into JDBC 


Here I am going to demonstrate the functionality of writing the analyzed data into JDBC storage. In this simple example, We'll fetch records from a file then analyze it using hive and finally store those analyzed data into MySQL database. 

Records - These are the records that we are going to analyze.



bread   12      12/01/2012
sugar   20      12/01/2012
milk    5       12/01/2012
tea     33      12/01/2012
soap    10      12/01/2012
tea     9       13/01/2012
bread   21      13/01/2012
sugar   9       13/01/2012
milk    14      13/01/2012
soap    8       13/01/2012
biscuit 10      14/01/2012


Hive Queries



//drop tables if already exist
 
drop table productTable;
 
drop table summarizedTable;
 
CREATE TABLE productTable (product STRING, noOfItems INT, dateOfSold STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
 
//Load the file with above records
 
load data local inpath '/opt/sample/data/productInfo.txt' into table productTable;
 
CREATE EXTERNAL TABLE IF NOT EXISTS
summarizedTable( product STRING, itemsSold INT) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
    TBLPROPERTIES (
                'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
                'mapred.jdbc.url' = 'jdbc:mysql://localhost/test',
                'mapred.jdbc.username' = 'username',
                'mapred.jdbc.password' = 'password',
                'hive.jdbc.update.on.duplicate'= 'true',
                'hive.jdbc.primary.key.fields'='product',
                'hive.jdbc.table.create.query''CREATE TABLE productSummary (product VARCHAR(50) NOT NULL PRIMARY KEY, itemsSold INT NOT NULL)');
 
insert overwrite table summarizedTable SELECT product, sum(noOfItems) FROM productTable GROUP BY product;


View the result in mysql.

mysql> select * from productSummary;
+---------+-----------+
| product | itemsSold |
+---------+-----------+
| biscuit |        10 |
| bread   |        33 |
| milk    |        19 |
| soap    |        18 |
| sugar   |        29 |
| tea     |        42 |
+---------+-----------+
6 rows in set (0.00 sec)


Detail description on TBLPROPERTIES in storage handler.



Property name Required Detail
mapred.jdbc.driver.class Yes
The classname for the JDBC Driver to use. This should be available on Hive's classpath.
mapred.jdbc.url  YesThe connection url for the database.
mapred.jdbc.username NoThe database username, if it's required.
mapred.jdbc.password  No The database Password, if it's required.
hive.jdbc.table.create.query No
If table already exist in the database, then you don't need this. Otherwise you should provide the sql query for creating the table in the database.

mapred.jdbc.output.table.name  No
The name of the table in the database. It does not have to be the same as the name of the table in Hive. If you have specified the sql query for creating the table, handler will pick the table name from query. Otherwise you need to specify this if your meta table name is different from the table in database.
hive.jdbc.primary.key.fields YesIf you have any primary keys in the database table
hive.jdbc.update.on.duplicate No
Expected values are either "true" or "false". If "true" then the storage handler will update the records with duplicate keys. Otherwise it will insert all data. 

hive.jdbc.output.upsert.queryNo
This can be use to optimize the update operation. The default implementation is  to use insert or update statement after the select statement. So there will be two database round trips. But we can reduce it to one by using db specific upsert statement. Example query for mysql database is 'INSERT INTO productSummary (product, itemsSold) values (?,?) ON DUPLICATE KEY UPDATE itemsSold=?'

hive.jdbc.upsert.query.values.order No
If you are using an upsert query then this is mandatory. sample values for above query will be 'product,itemsSold,itemsSold' //values order for each question mark 

hive.jdbc.input.columns.mapping No
This is mandatory if your field names in meta table and database tables are different. Provide the field names in database table in the same order as the field names in meta table with ',' separated values. example: productNames,noOfItemsSold. These will map to your meta table with product,itemsSold field names.

mapred.jdbc.input.table.name No
Used when reading from a database table. This is needed if the meta table name and database table name are different.



Sample on reading from JDBC.


Now I am going to read the previously saved records from mysql using hive jdbc-handler.

Hive queries



//drop table if already exists
drop table savedRecords;
 
CREATE EXTERNAL TABLE IF NOT EXISTS savedRecords( product STRING, itemsSold INT) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'        
             TBLPROPERTIES (                
                    'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
                    'mapred.jdbc.url' = 'jdbc:mysql://localhost/test', 
                    'mapred.jdbc.username' = 'username',     
                    'mapred.jdbc.password' = 'password',
                    'mapred.jdbc.input.table.name' = 'productSummary');
SELECT product,itemsSold FROM savedRecords ORDER BY itemsSold;

This will give all the records in the productSummary table.

6 comments:

MANOJ BABU said...

Hi Kasun,
The info you have provided is very useful and its very interesting.

It will more useful if could provide more info on BAM and the jar
org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler

MANOJ BABU said...
This comment has been removed by the author.
Kasun Weranga Gunathilake said...

Hi Manoj,

You can find more information about BAM from here [1], [2]
WSO2 BAM ships this hive jdbc handler. You can try it out.


[1]http://docs.wso2.org/wiki/display/BAM200/WSO2+Business+Activity+Monitor+Documentation
[2] http://wso2.com/products/business-activity-monitor/

Sanky said...

Hi Kausn,
I am trying in same way as you have explained but I am getting exception stack trace as -

TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:14,758] INFO {org.wso2.carbon.cassandra.server.CarbonCassandraAuthenticator} - The key is not present in the cache... {org.wso2.carbon.cassandra.server.CarbonCassandraAuthenticator}
TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:40,166] ERROR {org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager} - Failed to get connection {org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager}
org.h2.jdbc.JdbcSQLException: Connection is broken: "null" [90067-140]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:327)
at org.h2.message.DbException.get(DbException.java:156)
at org.h2.engine.SessionRemote.connectServer(SessionRemote.java:331)
at org.h2.engine.SessionRemote.connectEmbeddedOrServer(SessionRemote.java:253)
at org.h2.engine.SessionRemote.createSession(SessionRemote.java:219)
at org.h2.jdbc.JdbcConnection.(JdbcConnection.java:111)
at org.h2.jdbc.JdbcConnection.(JdbcConnection.java:95)
at org.h2.Driver.connect(Driver.java:73)
at java.sql.DriverManager.getConnection(DriverManager.java:582)
at java.sql.DriverManager.getConnection(DriverManager.java:185)
at org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager.createConnection(DBManager.java:62)
at org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager.createConnection(DBManager.java:74)
at org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCDataOutputFormat.getHiveRecordWriter(JDBCDataOutputFormat.java:48)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:247)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:235)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:478)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:526)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:959)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1012)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:557)
at org.apache.hadoop.hive.ql.exec.ExecReducer.close(ExecReducer.java:303)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:528)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:419)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: java.net.UnknownHostException: null
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:849)
at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1202)
at java.net.InetAddress.getAllByName0(InetAddress.java:1153)
at java.net.InetAddress.getAllByName(InetAddress.java:1083)
at java.net.InetAddress.getAllByName(InetAddress.java:1019)
at java.net.InetAddress.getByName(InetAddress.java:969)
at org.h2.util.NetUtils.createSocket(NetUtils.java:90)
at org.h2.engine.SessionRemote.initTransfer(SessionRemote.java:96)
at org.h2.engine.SessionRemote.connectServer(SessionRemote.java:327)
... 29 more
TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:42,870] ERROR {ExecReducer} - Hit error while closing operators - failing tree {ExecReducer}

Kasun Weranga Gunathilake said...

Hi Sanky,

Please make sure you have executed the drop table command, before the create table command. If you have change some property in hive create table query.

Thanks,
Kasun.

Jonah P said...

Thhank you