I was able to complete the implementation of Hive JDBC Storage Handler with basic functionality. Therefore I thought to write a blog post describing the usage with some sample queries. Currently It supports writing into any database and reading from major databases (MySql, MsSql, Oracle, H2, PostgreSQL). This feature comes with WSO2 BAM 2.0.0 release.
Setting up the BAM to use Hive jdbc-handler.
Please add your jdbc-driver to $BAM_HOME/repository/component/lib directory, before starting the server.
Web UI for executing Hive queries.
BAM2 comes with a web ui for executing the Hive queries. Also there is a option to schedule the script
User interface for writing Hive Queries
User interface for scheduling hive script
Sample on writing analyzed data into JDBC
Here I am going to demonstrate the functionality of writing the analyzed data into JDBC storage. In this simple example, We'll fetch records from a file then analyze it using hive and finally store those analyzed data into MySQL database.
Records - These are the records that we are going to analyze.
bread 12 12/01/2012
sugar 20 12/01/2012
milk 5 12/01/2012
tea 33 12/01/2012
soap 10 12/01/2012
tea 9 13/01/2012
bread 21 13/01/2012
sugar 9 13/01/2012
milk 14 13/01/2012
soap 8 13/01/2012
biscuit 10 14/01/2012
Hive Queries
//drop tables if already exist
drop table productTable;
drop table summarizedTable;
CREATE TABLE productTable (product STRING, noOfItems INT, dateOfSold STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
//Load the file with above records
load data local inpath '/opt/sample/data/productInfo.txt' into table productTable;
CREATE EXTERNAL TABLE IF NOT EXISTS
summarizedTable( product STRING, itemsSold INT)
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
'mapred.jdbc.url' = 'jdbc:mysql://localhost/test',
'mapred.jdbc.username' = 'username',
'mapred.jdbc.password' = 'password',
'hive.jdbc.update.on.duplicate'= 'true',
'hive.jdbc.primary.key.fields'='product',
'hive.jdbc.table.create.query' = 'CREATE TABLE productSummary (product VARCHAR(50) NOT NULL PRIMARY KEY, itemsSold INT NOT NULL)');
insert overwrite table summarizedTable SELECT product, sum(noOfItems) FROM productTable GROUP BY product;
View the result in mysql.
mysql> select * from productSummary; +---------+-----------+ | product | itemsSold | +---------+-----------+ | biscuit | 10 | | bread | 33 | | milk | 19 | | soap | 18 | | sugar | 29 | | tea | 42 | +---------+-----------+ 6 rows in set (0.00 sec)
Detail description on TBLPROPERTIES in storage handler.
Property name | Required | Detail |
---|---|---|
mapred.jdbc.driver.class | Yes |
The classname for the JDBC Driver to use. This should be available on Hive's classpath.
|
mapred.jdbc.url | Yes | The connection url for the database. |
mapred.jdbc.username | No | The database username, if it's required. |
mapred.jdbc.password | No | The database Password, if it's required. |
hive.jdbc.table.create.query | No |
If table already exist in the database, then you don't need this. Otherwise you should provide the sql query for creating the table in the database.
|
mapred.jdbc.output.table.name | No |
The name of the table in the database. It does not have to be the same as the name of the table in Hive. If you have specified the sql query for creating the table, handler will pick the table name from query. Otherwise you need to specify this if your meta table name is different from the table in database.
|
hive.jdbc.primary.key.fields | Yes | If you have any primary keys in the database table |
hive.jdbc.update.on.duplicate | No |
Expected values are either "true" or "false". If "true" then the storage handler will update the records with duplicate keys. Otherwise it will insert all data.
|
hive.jdbc.output.upsert.query | No |
This can be use to optimize the update operation. The default implementation is to use insert or update statement after the select statement. So there will be two database round trips. But we can reduce it to one by using db specific upsert statement. Example query for mysql database is 'INSERT INTO productSummary (product, itemsSold) values (?,?) ON DUPLICATE KEY UPDATE itemsSold=?'
|
hive.jdbc.upsert.query.values.order | No |
If you are using an upsert query then this is mandatory. sample values for above query will be 'product,itemsSold,itemsSold' //values order for each question mark
|
hive.jdbc.input.columns.mapping | No |
This is mandatory if your field names in meta table and database tables are different. Provide the field names in database table in the same order as the field names in meta table with ',' separated values. example: productNames,noOfItemsSold. These will map to your meta table with product,itemsSold field names.
|
mapred.jdbc.input.table.name | No |
Used when reading from a database table. This is needed if the meta table name and database table name are different.
|
Sample on reading from JDBC.
Now I am going to read the previously saved records from mysql using hive jdbc-handler.
Hive queries
//drop table if already exists
drop table savedRecords;
CREATE EXTERNAL TABLE IF NOT EXISTS savedRecords( product STRING, itemsSold INT)
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'mapred.jdbc.driver.class' = 'com.mysql.jdbc.Driver',
'mapred.jdbc.url' = 'jdbc:mysql://localhost/test',
'mapred.jdbc.username' = 'username',
'mapred.jdbc.password' = 'password',
'mapred.jdbc.input.table.name' = 'productSummary');
SELECT product,itemsSold FROM savedRecords ORDER BY itemsSold;
This will give all the records in the productSummary table.
5 comments:
Hi Kasun,
The info you have provided is very useful and its very interesting.
It will more useful if could provide more info on BAM and the jar
org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler
Hi Manoj,
You can find more information about BAM from here [1], [2]
WSO2 BAM ships this hive jdbc handler. You can try it out.
[1]http://docs.wso2.org/wiki/display/BAM200/WSO2+Business+Activity+Monitor+Documentation
[2] http://wso2.com/products/business-activity-monitor/
Hi Kausn,
I am trying in same way as you have explained but I am getting exception stack trace as -
TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:14,758] INFO {org.wso2.carbon.cassandra.server.CarbonCassandraAuthenticator} - The key is not present in the cache... {org.wso2.carbon.cassandra.server.CarbonCassandraAuthenticator}
TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:40,166] ERROR {org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager} - Failed to get connection {org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager}
org.h2.jdbc.JdbcSQLException: Connection is broken: "null" [90067-140]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:327)
at org.h2.message.DbException.get(DbException.java:156)
at org.h2.engine.SessionRemote.connectServer(SessionRemote.java:331)
at org.h2.engine.SessionRemote.connectEmbeddedOrServer(SessionRemote.java:253)
at org.h2.engine.SessionRemote.createSession(SessionRemote.java:219)
at org.h2.jdbc.JdbcConnection.(JdbcConnection.java:111)
at org.h2.jdbc.JdbcConnection.(JdbcConnection.java:95)
at org.h2.Driver.connect(Driver.java:73)
at java.sql.DriverManager.getConnection(DriverManager.java:582)
at java.sql.DriverManager.getConnection(DriverManager.java:185)
at org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager.createConnection(DBManager.java:62)
at org.wso2.carbon.hadoop.hive.jdbc.storage.db.DBManager.createConnection(DBManager.java:74)
at org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCDataOutputFormat.getHiveRecordWriter(JDBCDataOutputFormat.java:48)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:247)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:235)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:478)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:526)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:959)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1012)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:557)
at org.apache.hadoop.hive.ql.exec.ExecReducer.close(ExecReducer.java:303)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:528)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:419)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: java.net.UnknownHostException: null
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:849)
at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1202)
at java.net.InetAddress.getAllByName0(InetAddress.java:1153)
at java.net.InetAddress.getAllByName(InetAddress.java:1083)
at java.net.InetAddress.getAllByName(InetAddress.java:1019)
at java.net.InetAddress.getByName(InetAddress.java:969)
at org.h2.util.NetUtils.createSocket(NetUtils.java:90)
at org.h2.engine.SessionRemote.initTransfer(SessionRemote.java:96)
at org.h2.engine.SessionRemote.connectServer(SessionRemote.java:327)
... 29 more
TID: [] [WSO2 Business Activity Monitor] [2012-10-12 08:23:42,870] ERROR {ExecReducer} - Hit error while closing operators - failing tree {ExecReducer}
Hi Sanky,
Please make sure you have executed the drop table command, before the create table command. If you have change some property in hive create table query.
Thanks,
Kasun.
Post a Comment