/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
//启动一个守护线程用来监控job是否结束
StopWatch sw = new StopWatch().start();
//获取切片的最小大小
// 1 1 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//获取切片的最大大小
// 9223372036854775807
long maxSize = getMaxSplitSize(job);
// generate splits
// 创建一个List用来保存分片信息的结果
List<InputSplit> splits = new ArrayList<InputSplit>();
// 通过job中的输入路径获取 job需要处理的所有文件的详情
List<FileStatus> files = listStatus(job);
// 遍历每个文件 计算分片的数量
for (FileStatus file: files) {
// 获取文件路径
Path path = file.getPath();
// 获取文件长度
long length = file.getLen();
// 如果文件长度不为零
if (length != 0) {
//声明数组变量 数组中存放的是 块信息
BlockLocation[] blkLocations;
// 判断fileStatus对象是否是LocatedFileStatus类型
if (file instanceof LocatedFileStatus) {
// 如果是,就强转成LocatedFileStatus,
// 并取出块信息赋值给上面声明的数组
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
// 如果 文件状态中不包含块信息,
// 就直接通过FS对象获取文件的块信息。
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断是否进行切片
if (isSplitable(job, path)) {
// 拿到块大小 134217728
long blockSize = file.getBlockSize();
// 134217728 134217728 1 9223372036854775807
// 默认的分片大小和块大小相同
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//bytesRemaining存储还没有分配分片的剩余长度
// 开始时 将文件长度赋值给剩余大小
long bytesRemaining = length;
// 209715200 13421772
// 200M / 128M =1.56 > 1.1
// 72 / 128 < 1.1
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 获取块索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//创建分片信息,并且添加到List中 0 128M
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//从剩余大小中减去分片大小
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
// 将最后一段数据创建一个分片信息 加入到分片结果中
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
// 对于文件长度为0的空文件,创建一个空的分片信息
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
// 将计算出的分片数量保存在Configuration
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
FileInputFormat源码
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
/*import com.google.common.collect.Lists;*/
import com.clearspring.analytics.util.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* A base class for file-based {@link InputFormat}s.
*
* <p><code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobContext)}.
* Subclasses of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(JobContext, Path)} method to ensure input-files are
* not split-up and are processed as a whole by {@link Mapper}s.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
public static final String INPUT_DIR =
"mapreduce.input.fileinputformat.inputdir";
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static final String PATHFILTER_CLASS =
"mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
private static final double SPLIT_SLOP = 1.1; // 10% slop
@Deprecated
public static enum Counter {
BYTES_READ
}
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
/**
* Proxy PathFilter that accepts a path only if all filters given in the
* constructor do. Used by the listPaths() to apply the built-in
* hiddenFileFilter together with a user provided one (if any).
*/
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
/**
* @param job
* the job to modify
* @param inputDirRecursive
*/
public static void setInputDirRecursive(Job job,
boolean inputDirRecursive) {
job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
inputDirRecursive);
}
/**
* @param job
* the job to look at.
* @return should the files to be read recursively?
*/
public static boolean getInputDirRecursive(JobContext job) {
return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
false);
}
/**
* Get the lower bound on split size imposed by the format.
* @return the number of bytes of the minimal split for this format
*/
protected long getFormatMinSplitSize() {
return 1;
}
/**
* Is the given filename splitable? Usually, true, but if the file is
* stream compressed, it will not be.
*
* <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up
* so that {@link Mapper}s process entire files.
*
* @param context the job context
* @param filename the file name to check
* @return is this file splitable?
*/
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
/**
* Set a PathFilter to be applied to the input paths for the map-reduce job.
* @param job the job to modify
* @param filter the PathFilter class use for filtering the input paths.
*/
public static void setInputPathFilter(Job job,
Class<? extends PathFilter> filter) {
job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
PathFilter.class);
}
/**
* Set the minimum input split size
* @param job the job to modify
* @param size the minimum size
*/
public static void setMinInputSplitSize(Job job,
long size) {
job.getConfiguration().setLong(SPLIT_MINSIZE, size);
}
/**
* Get the minimum split size
* @param job the job
* @return the minimum number of bytes that can be in a split
*/
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
/**
* Set the maximum split size
* @param job the job to modify
* @param size the maximum split size
*/
public static void setMaxInputSplitSize(Job job,
long size) {
job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
}
/**
* Get the maximum split size.
* @param context the job to look at.
* @return the maximum number of bytes a split can include
*/
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
//启动一个守护线程用来监控job是否结束
StopWatch sw = new StopWatch().start();
//获取切片的最小大小
// 1 1 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//获取切片的最大大小
// 9223372036854775807
long maxSize = getMaxSplitSize(job);
// generate splits
// 创建一个List用来保存分片信息的结果
List<InputSplit> splits = new ArrayList<InputSplit>();
// 通过job中的输入路径获取 job需要处理的所有文件的详情
List<FileStatus> files = listStatus(job);
// 遍历每个文件 计算分片的数量
for (FileStatus file: files) {
// 获取文件路径
Path path = file.getPath();
// 获取文件长度
long length = file.getLen();
// 如果文件长度不为零
if (length != 0) {
//声明数组变量 数组中存放的是 块信息
BlockLocation[] blkLocations;
// 判断fileStatus对象是否是LocatedFileStatus类型
if (file instanceof LocatedFileStatus) {
// 如果是,就强转成LocatedFileStatus,
// 并取出块信息赋值给上面声明的数组
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
// 如果 文件状态中不包含块信息,
// 就直接通过FS对象获取文件的块信息。
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断是否进行切片
if (isSplitable(job, path)) {
// 拿到块大小 134217728
long blockSize = file.getBlockSize();
// 134217728 134217728 1 9223372036854775807
// 默认的分片大小和块大小相同
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//bytesRemaining存储还没有分配分片的剩余长度
// 开始时 将文件长度赋值给剩余大小
long bytesRemaining = length;
// 209715200 13421772
// 200M / 128M =1.56 > 1.1
// 72 / 128 < 1.1
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 获取块索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//创建分片信息,并且添加到List中 0 128M
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//从剩余大小中减去分片大小
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
// 将最后一段数据创建一个分片信息 加入到分片结果中
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
// 对于文件长度为0的空文件,创建一个空的分片信息
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
// 将计算出的分片数量保存在Configuration
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
/**
* Sets the given comma separated paths as the list of inputs
* for the map-reduce job.
*
* @param job the job
* @param commaSeparatedPaths Comma separated paths to be set as
* the list of inputs for the map-reduce job.
*/
public static void setInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
setInputPaths(job, StringUtils.stringToPath(
getPathStrings(commaSeparatedPaths)));
}
/**
* Add the given comma separated paths to the list of inputs for
* the map-reduce job.
*
* @param job The job to modify
* @param commaSeparatedPaths Comma separated paths to be added to
* the list of inputs for the map-reduce job.
*/
public static void addInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
for (String str : getPathStrings(commaSeparatedPaths)) {
addInputPath(job, new Path(str));
}
}
/**
* Set the array of {@link Path}s as the list of inputs
* for the map-reduce job.
*
* @param job The job to modify
* @param inputPaths the {@link Path}s of the input directories/files
* for the map-reduce job.
*/
public static void setInputPaths(Job job,
Path... inputPaths) throws IOException {
Configuration conf = job.getConfiguration();
Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
for(int i = 1; i < inputPaths.length;i++) {
str.append(StringUtils.COMMA_STR);
path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
str.append(StringUtils.escapeString(path.toString()));
}
conf.set(INPUT_DIR, str.toString());
}
/**
* Add a {@link Path} to the list of inputs for the map-reduce job.
*
* @param job The {@link Job} to modify
* @param path {@link Path} to be added to the list of inputs for
* the map-reduce job.
*/
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
// This method escapes commas in the glob pattern of the given paths.
private static String[] getPathStrings(String commaSeparatedPaths) {
int length = commaSeparatedPaths.length();
int curlyOpen = 0;
int pathStart = 0;
boolean globPattern = false;
List<String> pathStrings = new ArrayList<String>();
for (int i=0; i<length; i++) {
char ch = commaSeparatedPaths.charAt(i);
switch(ch) {
case '{' : {
curlyOpen++;
if (!globPattern) {
globPattern = true;
}
break;
}
case '}' : {
curlyOpen--;
if (curlyOpen == 0 && globPattern) {
globPattern = false;
}
break;
}
case ',' : {
if (!globPattern) {
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
pathStart = i + 1 ;
}
break;
}
default:
continue; // nothing special to do for this character
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
return pathStrings.toArray(new String[0]);
}
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
* @param context The job
* @return the list of input {@link Path}s for the map-reduce job.
*/
public static Path[] getInputPaths(JobContext context) {
String dirs = context.getConfiguration().get(INPUT_DIR, "");
String [] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
result[i] = new Path(StringUtils.unEscapeString(list[i]));
}
return result;
}
}
FileOutputFormat源码
package org.apache.hadoop.mapreduce.lib.output;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import java.io.IOException;
import java.text.NumberFormat;
/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
/** Construct output file names so that, when an output directory listing is
* sorted lexicographically, positions correspond to output partitions.*/
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
protected static final String PART = "part";
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private FileOutputCommitter committer = null;
public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
public static final String COMPRESS_CODEC =
"mapreduce.output.fileoutputformat.compress.codec";
public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
@Deprecated
public static enum Counter {
BYTES_WRITTEN
}
/**
* Set whether the output of the job is compressed.
* @param job the job to modify
* @param compress should the output of the job be compressed?
*/
public static void setCompressOutput(Job job, boolean compress) {
job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
}
/**
* Is the job output compressed?
* @param job the Job to look in
* @return <code>true</code> if the job output should be compressed,
* <code>false</code> otherwise
*/
public static boolean getCompressOutput(JobContext job) {
return job.getConfiguration().getBoolean(
FileOutputFormat.COMPRESS, false);
}
/**
* Set the {@link CompressionCodec} to be used to compress job outputs.
* @param job the job to modify
* @param codecClass the {@link CompressionCodec} to be used to
* compress the job outputs
*/
public static void
setOutputCompressorClass(Job job,
Class<? extends CompressionCodec> codecClass) {
setCompressOutput(job, true);
job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC,
codecClass,
CompressionCodec.class);
}
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
* @param job the {@link Job} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the
* job outputs
* @throws IllegalArgumentException if the class was specified, but not found
*/
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobContext job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
}
}
return codecClass;
}
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException;
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
outDir.getFileSystem(job.getConfiguration()).delete(outDir,true);
// throw new FileAlreadyExistsException("Output directory " + outDir +
// " already exists");
// System.out.println("输出目录已存在,我知道,我不管");
}
}
/**
* Set the {@link Path} of the output directory for the map-reduce job.
*
* @param job The job to modify
* @param outputDir the {@link Path} of the output directory for
* the map-reduce job.
*/
public static void setOutputPath(Job job, Path outputDir) {
try {
outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
outputDir);
} catch (IOException e) {
// Throw the IOException as a RuntimeException to be compatible with MR1
throw new RuntimeException(e);
}
job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
}
/**
* Get the {@link Path} to the output directory for the map-reduce job.
*
* @return the {@link Path} to the output directory for the map-reduce job.
* @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
*/
public static Path getOutputPath(JobContext job) {
String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
return name == null ? null: new Path(name);
}
/**
* Get the {@link Path} to the task's temporary output directory
* for the map-reduce job
*
* <b id="SideEffectFiles">Tasks' Side-Effect Files</b>
*
* <p>Some applications need to create/write-to side-files, which differ from
* the actual job-outputs.
*
* <p>In such cases there could be issues with 2 instances of the same TIP
* (running simultaneously e.g. speculative tasks) trying to open/write-to the
* same file (path) on HDFS. Hence the application-writer will have to pick
* unique names per task-attempt (e.g. using the attemptid, say
* <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p>
*
* <p>To get around this the Map-Reduce framework helps the application-writer
* out by maintaining a special
* <tt>{mapreduce.output.fileoutputformat.outputdir}/_temporary/_{taskid}</tt>
* sub-directory for each task-attempt on HDFS where the output of the
* task-attempt goes. On successful completion of the task-attempt the files
* in the <tt>{mapreduce.output.fileoutputformat.outputdir}/_temporary/_{taskid}</tt> (only)
* are <i>promoted</i> to <tt>{mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the * framework discards the sub-directory of unsuccessful task-attempts. This * is completely transparent to the application.</p>
* * <p>The application-writer can take advantage of this by creating any * side-files required in a work directory during execution * of his task i.e. via * {@link #getWorkOutputPath(TaskInputOutputContext)}, and
* the framework will move them out similarly - thus she doesn't have to pick * unique paths per task-attempt.</p>
* * <p>The entire discussion holds true for maps of jobs with * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, * goes directly to HDFS.</p> * * @return the {@link Path} to the task's temporary output directory * for the map-reduce job.
*/
public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
) throws IOException, InterruptedException {
FileOutputCommitter committer = (FileOutputCommitter) context.getOutputCommitter();
return committer.getWorkPath();
}
/**
* Helper function to generate a {@link Path} for a file that is unique for
* the task within the job output directory.
*
* <p>The path can be used to create custom files from within the map and
* reduce tasks. The path name will be unique for each task. The path parent
* will be the job output directory.</p>ls
*
* <p>This method uses the {@link #getUniqueFile} method to make the file name
* unique for the task.</p>
*
* @param context the context for the task.
* @param name the name for the file.
* @param extension the extension for the file
* @return a unique path accross all tasks of the job.
*/
public static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, String name,
String extension
) throws IOException, InterruptedException {
return new Path(getWorkOutputPath(context),
getUniqueFile(context, name, extension));
}
/**
* Generate a unique filename, based on the task id, name, and extension
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
* @return a string likename-[mrsct]-idextension
*/
public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(
TaskID.getRepresentingCharacter(taskId.getTaskType()));
result.append('-');
result.append(NUMBER_FORMAT.format(partition));
result.append(extension);
return result.toString();
}
/**
* Get the default path and filename for the output format.
* @param context the task context
* @param extension an extension to add to the filename
* @return a full path output/_temporary/taskid/part-[mr]-$id
* @throws IOException
*/
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException{
FileOutputCommitter committer =
(FileOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context,
getOutputName(context), extension));
}
/**
* Get the base output name for the output file.
*/
protected static String getOutputName(JobContext job) {
return job.getConfiguration().get(BASE_OUTPUT_NAME, PART);
}
/**
* Set the base output name for output file to be created.
*/
protected static void setOutputName(JobContext job, String name) {
job.getConfiguration().set(BASE_OUTPUT_NAME, name);
}
public synchronized
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
}