/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * This program uses map/reduce to just run a distributed job where there is * no interaction between the tasks and each task write a large unsorted * random binary sequence file of BytesWritable. * In order for this program to generate data for terasort with 10-byte keys * and 90-byte values, have the following config: *
{@code
 * 
 * 
 * 
 *   
 *     mapreduce.randomwriter.minkey
 *     10
 *   
 *   
 *     mapreduce.randomwriter.maxkey
 *     10
 *   
 *   
 *     mapreduce.randomwriter.minvalue
 *     90
 *   
 *   
 *     mapreduce.randomwriter.maxvalue
 *     90
 *   
 *   
 *     mapreduce.randomwriter.totalbytes
 *     1099511627776
 *   
 * }
* Equivalently, {@link RandomWriter} also supports all the above options * and ones supported by {@link GenericOptionsParser} via the command-line. */ public class RandomWriter extends Configured implements Tool { public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes"; public static final String BYTES_PER_MAP = "mapreduce.randomwriter.bytespermap"; public static final String MAPS_PER_HOST = "mapreduce.randomwriter.mapsperhost"; public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue"; public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue"; public static final String MIN_KEY = "mapreduce.randomwriter.minkey"; public static final String MAX_KEY = "mapreduce.randomwriter.maxkey"; /** * User counters */ static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } /** * A custom input format that creates virtual inputs of a single string * for each map. */ static class RandomInputFormat extends InputFormat { /** * Generate the requested number of file splits, with the filename * set to the filename of the output file. */ public List getSplits(JobContext job) throws IOException { List result = new ArrayList(); Path outDir = FileOutputFormat.getOutputPath(job); int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); for(int i=0; i < numSplits; ++i) { result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, (String[])null)); } return result; } /** * Return a single record (filename, "") where the filename is taken from * the file split. */ static class RandomRecordReader extends RecordReader { Path name; Text key = null; Text value = new Text(); public RandomRecordReader(Path p) { name = p; } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } public boolean nextKeyValue() { if (name != null) { key = new Text(); key.set(name.getName()); name = null; return true; } return false; } public Text getCurrentKey() { return key; } public Text getCurrentValue() { return value; } public void close() {} public float getProgress() { return 0.0f; } } public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RandomRecordReader(((FileSplit) split).getPath()); } } static class RandomMapper extends Mapper { private long numBytesToWrite; private int minKeySize; private int keySizeRange; private int minValueSize; private int valueSizeRange; private Random random = new Random(); private BytesWritable randomKey = new BytesWritable(); private BytesWritable randomValue = new BytesWritable(); private void randomizeBytes(byte[] data, int offset, int length) { for(int i=offset + length - 1; i >= offset; --i) { data[i] = (byte) random.nextInt(256); } } /** * Given an output filename, write a bunch of random records to it. */ public void map(WritableComparable key, Writable value, Context context) throws IOException,InterruptedException { int itemCount = 0; while (numBytesToWrite > 0) { int keyLength = minKeySize + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); randomKey.setSize(keyLength); randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); int valueLength = minValueSize + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); randomValue.setSize(valueLength); randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); context.write(randomKey, randomValue); numBytesToWrite -= keyLength + valueLength; context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength); context.getCounter(Counters.RECORDS_WRITTEN).increment(1); if (++itemCount % 200 == 0) { context.setStatus("wrote record " + itemCount + ". " + numBytesToWrite + " bytes left."); } } context.setStatus("done with " + itemCount + " records."); } /** * Save the values out of the configuaration that we need to write * the data. */ @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); numBytesToWrite = conf.getLong(BYTES_PER_MAP, 1*1024*1024*1024); minKeySize = conf.getInt(MIN_KEY, 10); keySizeRange = conf.getInt(MAX_KEY, 1000) - minKeySize; minValueSize = conf.getInt(MIN_VALUE, 0); valueSizeRange = conf.getInt(MAX_VALUE, 20000) - minValueSize; } } /** * This is the main routine for launching a distributed random write job. * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. * The reduce doesn't do anything. * * @throws IOException */ public int run(String[] args) throws Exception { if (args.length == 0) { System.out.println("Usage: writer "); ToolRunner.printGenericCommandUsage(System.out); return 2; } Path outDir = new Path(args[0]); Configuration conf = getConf(); JobClient client = new JobClient(conf); ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1*1024*1024*1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); return -2; } long totalBytesToWrite = conf.getLong(TOTAL_BYTES, numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); if (numMaps == 0 && totalBytesToWrite > 0) { numMaps = 1; conf.setLong(BYTES_PER_MAP, totalBytesToWrite); } conf.setInt(MRJobConfig.NUM_MAPS, numMaps); Job job = Job.getInstance(conf); job.setJarByClass(RandomWriter.class); job.setJobName("random-writer"); FileOutputFormat.setOutputPath(job, outDir); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(RandomInputFormat.class); job.setMapperClass(RandomMapper.class); job.setReducerClass(Reducer.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); System.out.println("Running " + numMaps + " maps."); // reducer NONE job.setNumReduceTasks(0); Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true) ? 0 : 1; Date endTime = new Date(); System.out.println("Job ended: " + endTime); System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); return ret; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new RandomWriter(), args); System.exit(res); } }