/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.hsqldb.server.Server;
/**
* This is a demonstrative program, which uses DBInputFormat for reading
* the input data from a database, and DBOutputFormat for writing the data
* to the database.
*
* The Program first creates the necessary tables, populates the input table
* and runs the mapred job.
*
* The input data is a mini access log, with a <url,referrer,time>
* schema.The output is the number of pageviews of each url in the log,
* having the schema <url,pageview>.
*
* When called with no arguments the program starts a local HSQLDB server, and
* uses this database for storing/retrieving the data.
*
* This program requires some additional configuration relating to HSQLDB.
* The the hsqldb jar should be added to the classpath:
*
* export HADOOP_CLASSPATH=share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar
*
* And the hsqldb jar should be included with the -libjars
* argument when executing it with hadoop:
*
* -libjars share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar
*/
public class DBCountPageView extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
private Connection connection;
private boolean initialized = false;
private static final String[] AccessFieldNames = {"url", "referrer", "time"};
private static final String[] PageviewFieldNames = {"url", "pageview"};
private static final String DB_URL =
"jdbc:hsqldb:hsql://localhost/URLAccess";
private static final String DRIVER_CLASS = "org.hsqldb.jdbc.JDBCDriver";
private Server server;
private void startHsqldbServer() {
server = new Server();
server.setDatabasePath(0,
System.getProperty("test.build.data", "/tmp") + "/URLAccess");
server.setDatabaseName(0, "URLAccess");
server.start();
}
private void createConnection(String driverClassName
, String url) throws Exception {
Class.forName(driverClassName);
connection = DriverManager.getConnection(url);
connection.setAutoCommit(false);
}
private void shutdown() {
try {
connection.commit();
connection.close();
}catch (Throwable ex) {
LOG.warn("Exception occurred while closing connection :"
+ StringUtils.stringifyException(ex));
} finally {
try {
if(server != null) {
server.shutdown();
}
}catch (Throwable ex) {
LOG.warn("Exception occurred while shutting down HSQLDB :"
+ StringUtils.stringifyException(ex));
}
}
}
private void initialize(String driverClassName, String url)
throws Exception {
if(!this.initialized) {
if(driverClassName.equals(DRIVER_CLASS)) {
startHsqldbServer();
}
createConnection(driverClassName, url);
dropTables();
createTables();
populateAccess();
this.initialized = true;
}
}
private void dropTables() {
String dropAccess = "DROP TABLE Access";
String dropPageview = "DROP TABLE Pageview";
Statement st = null;
try {
st = connection.createStatement();
st.executeUpdate(dropAccess);
st.executeUpdate(dropPageview);
connection.commit();
st.close();
}catch (SQLException ex) {
try { if (st != null) { st.close(); } } catch (Exception e) {}
}
}
private void createTables() throws SQLException {
String createAccess =
"CREATE TABLE " +
"Access(url VARCHAR(100) NOT NULL," +
" referrer VARCHAR(100)," +
" time BIGINT NOT NULL, " +
" PRIMARY KEY (url, time))";
String createPageview =
"CREATE TABLE " +
"Pageview(url VARCHAR(100) NOT NULL," +
" pageview BIGINT NOT NULL, " +
" PRIMARY KEY (url))";
Statement st = connection.createStatement();
try {
st.executeUpdate(createAccess);
st.executeUpdate(createPageview);
connection.commit();
} finally {
st.close();
}
}
/**
* Populates the Access table with generated records.
*/
private void populateAccess() throws SQLException {
PreparedStatement statement = null ;
try {
statement = connection.prepareStatement(
"INSERT INTO Access(url, referrer, time)" +
" VALUES (?, ?, ?)");
Random random = new Random();
int time = random.nextInt(50) + 50;
final int PROBABILITY_PRECISION = 100; // 1 / 100
final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
//Pages in the site :
String[] pages = {"/a", "/b", "/c", "/d", "/e",
"/f", "/g", "/h", "/i", "/j"};
//linkMatrix[i] is the array of pages(indexes) that page_i links to.
int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8},
{0,2,4,6,7,9}, {0,1}, {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
//a mini model of user browsing a la pagerank
int currentPage = random.nextInt(pages.length);
String referrer = null;
for(int i=0; i