Hadoop Database Access

Hadoop中的数据库访问

http://jaguar13.iteye.com/blog/683392/

Hadoop数据结构MySQLJDBCSQL
Hadoop主要用来对非结构化或半结构化(HBase)数据进行存储和分析,而结构化的数据则一般使用数据库来进行存储和访问。本文的主要内容则是讲述如何将Hadoop与现有的数据库结合起来,在Hadoop应用程序中访问数据库中的文件。

1.DBInputFormat
DBInputFormat是Hadoop从0.19.0开始支持的一种输入格式,包含在包org.apache.hadoop.mapred.lib.db中,主要用来与现有的数据库系统进行交互,包括MySQL、PostgreSQL、Oracle等几个数据库系统。DBInputFormat在Hadoop应用程序中通过数据库供应商提供的JDBC接口来与数据库进行交互,并且可以使用标准的SQL来读取数据库中的记录。在使用DBInputFormat之前,必须将要使用的JDBC驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
在DBInputFormat类中包含以下三个内置类:
1.protected class DBRecordReader implements RecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。
2.public static class NullDBWritable implements DBWritable, Writable:主要用来实现DBWritable接口。
3.protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括start和end两个属性,start用来表示第一条记录的索引号,end表示最后一条记录的索引号。
其中DBWritable接口与Writable接口比较类似,也包含write和readFields两个函数,只是函数的参数有所不同。DBWritable中的两个函数分别为:
public void write(PreparedStatement statement) throws SQLException;
public void readFields(ResultSet resultSet) throws SQLException;
这两个函数分别用来给java.sql.PreparedStatement设置参数,以及从java.sql.ResultSet中读取一条记录,熟悉Java JDBC用法的应该对这两个类的用法比较了解。

2.使用DBInputFormat读取数据库表中的记录
上文已经对DBInputFormat以及其中的相关内置类作了简单介绍,下面对怎样使用DBInputFormat读取数据库记录进行详细的介绍,具体步骤如下:
1.使用DBConfiguration.configureDB (JobConf job, String driverClass, String dbUrl, String userName, String passwd)函数配置JDBC驱动,数据源,以及数据库访问的用户名和密码。例如MySQL数据库的JDBC的驱动为“com.mysql.jdbc.Driver”,数据源可以设置为“jdbc:mysql://localhost/mydb”,其中mydb可以设置为所需要访问的数据库。
2.使用DBInputFormat.setInput(JobConf job, Class<? extends DBWritable> inputClass, String tableName, String conditions, String orderBy, String… fieldNames)函数对要输入的数据进行一些初始化设置,包括输入记录的类名(必须实现了DBWritable接口)、数据表名、输入数据满足的条件、输入顺序、输入的属性列。也可以使用重载的函数setInput(JobConf job, Class<? extends DBWritable> inputClass, String inputQuery, String inputCountQuery)进行初始化,区别在于后者可以直接使用标准SQL进行初始化,具体可以参考Hadoop API中的讲解。
3.按照普通Hadoop应用程序的格式进行配置,包括Mapper类、Reducer类、输入输出文件格式等,然后调用JobClient.runJob(conf)。

3.使用示例
假设MySQL数据库中有数据库school,其中的teacher数据表定义如下:
DROP TABLE IF EXISTS `school`.`teacher`;
CREATE TABLE `school`.`teacher` (
`id` int(11) default NULL,
`name` char(20) default NULL,
`age` int(11) default NULL,
`departmentID` int(11) default NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

首先给出实现了DBWritable接口的TeacherRecord类:
public class TeacherRecord implements Writable, DBWritable{

int id;
String name;
int age;
int departmentID;

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.id = in.readInt();
this.name = Text.readString(in);
this.age = in.readInt();
this.departmentID = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(this.id);
Text.writeString(out, this.name);
out.writeInt(this.age);
out.writeInt(this.departmentID);
}

@Override
public void readFields(ResultSet result) throws SQLException {
// TODO Auto-generated method stub
this.id = result.getInt(1);
this.name = result.getString(2);
this.age = result.getInt(3);
this.departmentID = result.getInt(4);
}

@Override
public void write(PreparedStatement stmt) throws SQLException {
// TODO Auto-generated method stub
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setInt(3, this.age);
stmt.setInt(4, this.departmentID);
}

@Override
public String toString() {
// TODO Auto-generated method stub
return new String(this.name + " " + this.age + " " + this.departmentID);
}

}

利用DBAccessMapper读取一条条记录:
public class DBAccessMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {

@Override
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}

}

Main函数如下:
public class DBAccess {

public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBAccess.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);

conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("dboutput"));

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/school","root","123456");

String [] fields = {"id", "name", "age", "departmentID"};
DBInputFormat.setInput(conf, TeacherRecord.class, "teacher",
null, "id", fields);

conf.setMapperClass(DBAccessMapper.class);
conf.setReducerClass(IdentityReducer.class);

JobClient.runJob(conf);
}

}

该示例从teacher表中读取所有记录,并以TextOutputFormat的格式输出到dboutput目录下,输出格式为<”id”, “name age departmentID”>。

4.使用DBOutputFormat向数据库中写记录
DBOutputFormat将计算结果写回到一个数据库,同样先调用DBConfiguration.configureDB()函数进行数据库配置,然后调用函数DBOutputFormat.setOutput (JobConf job, String tableName, String… fieldNames)进行初始化设置,包括数据库表名和属性列名。同样,在将记录写回数据库之前,要先实现DBWritable接口。每个DBWritable的实例在传递给Reducer中的OutputCollector时都将调用其中的write(PreparedStatement stmt)方法。在Reduce过程结束时,PreparedStatement中的对象将会被转化成SQL语句中的INSERT语句,从而插入到数据库中。

5.总结
DBInputFormat和DBOutputFormat提供了一个访问数据库的简单接口,虽然接口简单,但应用广泛。例如,可以将现有数据库中的数据转储到Hadoop中,由Hadoop进行分布式计算,通过Hadoop对海量数据进行分析,然后将分析后的结果转储到数据库中。在搜索引擎的实现中,可以通过Hadoop将爬行下来的网页进行链接分析,评分计算,建立倒排索引,然后存储到数据库中,通过数据库进行快速搜索。虽然上述的数据库访问接口已经能满足一般的数据转储功能,但是仍然存在一些限制不足,例如并发访问、数据表中的键必须要满足排序要求等,还需Hadoop社区的人员进行改进和优化。

Database Access with Hadoop

by Aaron KimballMarch 06, 20097 comments
Tweet

Hadoop’s strength is that it enables ad-hoc analysis of unstructured or semi-structured data. Relational databases, by contrast, allow for fast queries of very structured data sources. A point of frustration has been the inability to easily query both of these sources at the same time. The DBInputFormat component provided in Hadoop 0.19 finally allows easy import and export of data between Hadoop and many relational databases, allowing relational data to be more easily incorporated into your data processing pipeline.
This blog post explains how the DBInputFormat works and provides an example of using DBInputFormat to import data into HDFS.
DBInputFormat and JDBC

First we’ll cover how DBInputFormat interacts with databases. DBInputFormat uses JDBC to connect to data sources. Because JDBC is widely implemented, DBInputFormat can work with MySQL, PostgreSQL, and several other database systems. Individual database vendors provide JDBC drivers to allow third-party applications (like Hadoop) to connect to their databases. Links to popular drivers are listed in the resources section at the end of this post.
To start using DBInputFormat to connect to your database, you’ll need to download the appropriate database driver from the list in the resources section (see the end of this post), and drop it into the $HADOOP_HOME/lib/ directory on your Hadoop TaskTracker machines, and on the machine where you launch your jobs from.
Reading Tables with DBInputFormat

The DBInputFormat is an InputFormat class that allows you to read data from a database. An InputFormat is Hadoop’s formalization of a data source; it can mean files formatted in a particular way, data read from a database, etc. DBInputFormat provides a simple method of scanning entire tables from a database, as well as the means to read from arbitrary SQL queries performed against the database. Most queries are supported, subject to a few limitations discussed at the end of this article.
Configuring the job

To use the DBInputFormat, you’ll need to configure your job. The following example shows how to connect to a MySQL database and load from a table:
CREATE TABLE employees (
employee_id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(32) NOT NULL);
Listing 1: Example table schema
JobConf conf = new JobConf(getConf(), MyDriver.class);
conf.setInputFormat(DBInputFormat.class);
DBConfiguration.configureDB(conf,
“com.mysql.jdbc.Driver”,
“jdbc:mysql:localhost/mydatabase”);
String [] fields = { “employee_id”, "name" };
DBInputFormat.setInput(conf, MyRecord.class, “employees”,
null /* conditions */, “employee_id”, fields);
set Mapper, etc., and call JobClient.runJob(conf);
Listing 2: Java code to set up a MapReduce job with DBInputFormat
This example code will connect to mydatabase on localhost and read the two fields from the employees table.
The configureDB() and setInput() calls configure the DBInputFormat. The first call specifies the JDBC driver implementation to use and what database to connect to. The second call specifies what data to load from the database. The MyRecord class is the class where data will be read into in Java, and "employees" is the name of the table to read. The "employee_id" parameter specifies the table’s primary key, used for ordering results. The section “Limitations of the InputFormat” below explains why this is necessary. Finally, the fields array lists what columns of the table to read. An overloaded definition of setInput() allows you to specify an arbitrary SQL query to read from, instead.
After calling configureDB() and setInput(), you should configure the rest of your job as usual, setting the Mapper and Reducer classes, specifying any other data sources to read from (e.g., datasets in HDFS) and other job-specific parameters.
Retrieving the data

The DBInputFormat will read from the database, but how does data get to your mapper? The setInput() method used in the example above took, as a parameter, the name of a class which will hold the contents of one row. You’ll need to write an implementation of the DBWritable interface to allow DBInputFormat to populate your class with fields from the table. DBWritable is an adaptor interface that allows data to be read and written using both Hadoop’s internal serialization mechanism, and using JDBC calls. Once the data is read into your custom class, you can then read the class’ fields in the mapper.
The following example provides a DBWritable implementation that holds one record from the employees table, as described above:
class MyRecord implements Writable, DBWritable {
long id;
String name;

public void readFields(DataInput in) throws IOException {
this.id = in.readLong();
this.name = Text.readString(in);
}

public void readFields(ResultSet resultSet)
throws SQLException {
this.id = resultSet.getLong(1);
this.name = resultSet.getString(2);
}

public void write(DataOutput out) throws IOException {
out.writeLong(this.id);
Text.writeString(out, this.name);
}

public void write(PreparedStatement stmt) throws SQLException {
stmt.setLong(1, this.id);
stmt.setString(2, this.name);
}
}
Listing 3: DBWritable implementation for records from the employees table
A java.sql.ResultSet object represents the data returned from a SQL statement. It contains a cursor representing a single row of the results. This row will contain the fields specified in the setInput() call. In the readFields() method of MyRecord, we read the two fields from the ResultSet. The readFields() and write() methods that operate on java.io.DataInput and DataOutput objects are part of the Writable interface used by Hadoop to marshal data between mappers and reducers, or pack results into SequenceFiles.
Using the data in a mapper

The mapper then receives an instance of your DBWritable implementation as its input value. The input key is a row id provided by the database; you’ll most likely discard this value.
public class MyMapper extends MapReduceBase
implements Mapper<LongWritable, MyRecord, LongWritable, Text> {
public void map(LongWritable key, MyRecord val,
OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
// Use val.id, val.name here
output.collect(new LongWritable(val.id), new Text(val.name));
}
}
Listing 4: Example mapper using a custom DBWritable
Writing results back to the database

A companion class, DBOutputFormat, will allow you to write results back to a database. When setting up the job, call conf.setOutputFormat(DBOutputFormat.class); and then call DBConfiguration.configureDB() as before.
The DBOutputFormat.setOutput() method then defines how the results will be written back to the database. Its three arguments are the JobConf object for the job, a string defining the name of the table to write to, and an array of strings defining the fields of the table to populate. e.g., DBOutputFormat.setOutput(job, "employees", "employee_id", "name");.
The same DBWritable implementation that you created earlier will suffice to inject records back into the database. The write(PreparedStatement stmt) method will be invoked on each instance of the DBWritable that you pass to the OutputCollector from the reducer. At the end of reducing, those PreparedStatement objects will be turned into INSERT statements to run against the SQL database.
Limitations of the InputFormat

JDBC allows applications to generate SQL queries which are executed against the database; the results are then returned to the calling application. Keep in mind that you will be interacting with your database via repeated SQL queries. Therefore:
Hadoop may need to execute the same query multiple times. It will need to return the same results each time. So any concurrent updates to your database, etc, should not affect the query being run by your MapReduce job. This can be accomplished by disallowing writes to the table while your MapReduce job runs, restricting your MapReduce’s query via a clause such as “insert_date < yesterday,” or dumping the data to a temporary table in the database before launching your MapReduce process.
In order to parallelize the processing of records from the database, Hadoop will execute SQL queries that use ORDER BY, LIMIT, and OFFSET clauses to select ranges out of tables. Your results, therefore, need to be orderable by one or more keys (either PRIMARY, like the one in the example, or UNIQUE).
In order to set the number of map tasks, the DBInputFormat needs to know how many records it will read. So if you’re writing an arbitrary SQL query against the database, you will need to provide a second query that returns the number of rows that the first query will return (e.g., by using COUNT and GROUP BY).
With these restrictions in mind, there’s still a great deal of flexibility available to you. You can bulk load entire tables into HDFS, or select large ranges of data. For example, if you want to read records from a table that is also being populated by another source concurrently, you might set up that table to attach a timestamp field to each record. Before doing the bulk read, pick the current timestamp, then select all records with timestamps earlier than that one. New records being fed in by the other writer will have later timestamps and will not affect the MapReduce job.
Finally, be careful to understand the bottlenecks in your data processing pipeline. Launching a MapReduce job with 100 mappers performing queries against a database server may overload the server or its network connection. In this case, you’ll achieve less parallelism than theoretically possible, due to starvation, disk seeks, and other performance penalties.
Limitations of the OutputFormat

The DBOutputFormat writes to the database by generating a set of INSERT statements in each reducer. The reducer’s close() method then executes them in a bulk transaction. Performing a large number of these from several reduce tasks concurrently can swamp a database. If you want to export a very large volume of data, you may be better off generating the INSERT statements into a text file, and then using a bulk data import tool provided by your database to do the database import.
Conclusions

DBInputFormat provides a straightforward interface to read data from a database into your MapReduce applications. You can read database tables into HDFS, import them into Hive, or use them to perform joins in MapReduce jobs. By supporting JDBC, it provides a common interface to a variety of different database sources.
This is probably best not used as a primary data access mechanism; queries against database-driven data are most efficiently executed within the database itself, and large-scale data migration is better done using the bulk data export/import tools associated with your database. But when analysis of ad hoc data in HDFS can be improved by the addition of some additional relational data, DBInputFormat allows you to quickly perform the join without a large amount of setup overhead. DBOutputFormat then allows you to export results back to the same database for combining with other database-driven tables.
DBInputFormat is available in Hadoop 0.19 and is provided by HADOOP-2536, a patch started by Fredrik Hedberg and further developed by Enis Soztutar. A backport of this patch that can be applied to Hadoop 0.18.3 is available at the above link.
This article is based on a talk I gave at the SF Bay Hadoop User Group meetup on Feburary 18th; the slides from that talk are available as a PDF.
Resources

DBInputFormat documentation
Wikipedia on JDBC

Popular JDBC drivers:
MySQL: Connector/J
PostgreSQL JDBC
Oracle JDBC (Note: DBInputFormat currently does not work with Oracle, but this should change soon.)

分析型数据库的Hadoop 连接器

来源:TechTarget中国
http://www.dedecms.com/knowledge/data-base/generalized/2012/0703/2509.html

这篇文章介绍一下几个数据库的Hadoop 适配器的资料: AsterData 和 Greenplum 虽然在2008年8月期间都宣布自己是第一个在数据库层面实现MapReduce 的厂商,但是之后双方走的路线却不尽相同。 AsterData 之后走的路线还是将MapReduce层面的东西交给Hadoop 来做

这篇文章介绍一下几个数据库的Hadoop 适配器的资料:
  AsterData 和 Greenplum 虽然在2008年8月期间都宣布自己是第一个在数据库层面实现MapReduce 的厂商,但是之后双方走的路线却不尽相同。 AsterData 之后走的路线还是将MapReduce层面的东西交给Hadoop 来做,并在一年之后的2009年10月宣布跟Cloudera 合作推出了第一个Hadoop 连接器。 这个连接器主要有如下特点:
双向连接:Hadoop 和 AsterData 互相能够保持数据的同步传输。
SQL-MapReduce:通过sql 可以直接调用后面的手写的mapreduce端代码
MapReduce 执行的时候会占用尽可能多的内存,尤其是保证中间结果尽量不写入到磁盘。
   Greenplum 在今年9月的时候完成了它的第一个Hadoop 适配器,也是和Cloudera 合作开发, 不过它是单向的从HDFS 往Greenplum 导数据,仍然是通过所有集群节点并行加载。 它主要利用了机器的优势装载, 本身技术上并没有优化太多, 速度还算可以。 另外一个特点是它的适配器集成了它的Chorus , 作为数据生命周期管理的一个重要特性。
  Vertica 在2009年10月与Cloudera 合作开发出了Vertica-Hadoop 集成适配器,它当时只实现了从Vertica 往Hadoop 导入数据的功能,一年之后它增强了从Hadoop 的HDFS 往Vertica 导数据的功能,从而实现了Vertica 与 Hadoop 的双向连接功能,值得一提的是,Vertica 的适配器有两个很变态的功能,一个是能从在vertica 的客户端通过sql 直接指定sql运行在hadoop 集群上, 这个跟AsterData 的SQL-MapReduce 差不多,这个过程是透明的, 他后台的适配器能自动把SQL 翻译成HiveQL 然后执行返回。 另外一个特性是它的Hadoop 连接器是直接读Vertica 的文件格式和元数据,对,数据只存储一份,不像其他的连接器实际上是数据存两份要你自己确保两份数据是同步的。 (更准确的说法应该是Vertica 里面的数据是可以配置mirror的,用来提高IO并行能力和数据高可用性,Hadoop 的HDFS 的其中1到2个复制备份是直接依赖与Vertica 文件系统的, 而不像行数据库Greenplum,AsterData 那样数据在DBMS 里面和Hadoop 里面是完全不同的备份)
  另外一个开源的基于Mysql 的列式数据库Calpont 公司的InfiniteDB 也在今年9月完成了Hadoop 与InfiniteDB 的双向对接。 它简单实现了Hadoop 里面的DBInputFormat 和DBOutouptFormat 类,这种连接应该是手工进行同步的(猜测),另外猜测数据应该也是只储存一份。
  MicroSoft 的SQL Server 前段时间也在Yahoo 分离出来的Hortonwork的帮助下发布了第一个Hadoop 适配器,同样也是从HDFS 往SQL Server 导数据。 MicroSoft 本来是由自己的MapReduce 解决方案的,还是聚集了N多数据库大牛做的,但是毕竟还是不可能像Google 那样什么东西都领先一个时代的前提下,Microsoft 也不可能跟现在的Hadoop 生态圈级别的规模抵抗。 毕竟它是希望它的SQL Server 和Azure 平台能够跟业界融合,不然你从哪找人写你自己的编程语言。
  从行数据库往Hadoop里面导数据由于本身可以自己实现Hadoop 里面的DBInputFormat 方法或者用JDBC 接口的工具比如Cloudera 的Sqoop 导入。 但是这种导入方式的效率并不高,因为它并不是所有集群节点的并行加载。
  列数据库似乎更容易实现Hadoop 的双向连接器,而且连接器并不用双方之间互相导数据,数据存一份就够了,在使用的时候, 列数据库和Hadoop 可以很容易的双方互相解析对方的文件格式, 而行数据库往往很难做到这点。
  IBM 的Netezza 也是在Cloudera 的合作下与今年6月做出的Hadoop – Netezza 连接器,没什么特别需要介绍的。这个可以从Cloudera
   Teradata 最早实现Hadoop 读HDFS 是在09年底的时候,跟Netezza 一样,是将HDFS 的文件格式解析成裸的txt 格式,然后用数据库自带的并行加载工具导入TD 里面。 一年之后同样是跟Cloudera 合作开发出Hadoop 的适配器,能够并行的将TD 的数据导入到Hadoop 里面去。 不过目前并没有公开版的TD-Hadoop 适配器。你需要购买了TD 或者Cloudera 的企业版才能找他们要。 今年7月份的的SIGMOD 大会上有一篇论文是Teradata 的: A Hadoop based distributed loading approach to parallel data warehouses 。 主要是论证怎么样在TD 和Hadoop 都有默认的复制机制之下能够并行加快数据装载速度同时又减少不必要的数据传输。 这个显然比默认的Hadoop 适配器的机制NB 很多。
  Oracle 的Hadoop 连接器最早好像是由Quest 公司做的,一个为很多数据库厂商提供周边解决工具和方案的公司,很多Oracle DBA 都可能听过Quest 公司的数据库优化工具还是做的非常优秀的。 第一款OraHive 发布于今年6月,是一个单向的从Hadoop 往Oracle 抽数据的方式,而且正入其名字所说,是由执行Hive语句来实现,而不是像其他厂商的从HDFS 读数据的方式,今年10月发布的OraOop 实现了从Oracle 往Hadoop 导数据,使用了Cloudera 公司的数据库导入工具Sqoop。 Oracle 在今年的10月4日OpenWorld大会上也发布了自己的Hadoop 集成解决方案, 不过说实话,Oracle 在数据仓库上即不是一个廉价的储存解决方案,也不是一个高性能的计算解决方案,它再怎么针扎也无法代表未来数据仓库的主流方向了。 实在是不明白它为什么没有收购Netezza,AsterData,Greenplum,Vertica 这四家非常代表未来方向的公司。
  SybaseIQ 也与本月宣布了他们的Sybase IQ 15.4 ,大概下个月之前发布,其中也提到了Hadoop 的连接器,其中包括一个JAVA 的MapReduce 接口, 一个扩展的c++。 不太清楚它是双向的还是单向的。