1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.fs;
24 import org.apache.hadoop.fs.LocalDirAllocator;
25 import org.apache.hadoop.conf;
26
27 /**
28 * Manipulate the working area for the transient store for maps and reduces.
29 */
30 class MapOutputFile {
31
32 private JobConf conf;
33 private LocalDirAllocator lDirAlloc =
34 new LocalDirAllocator("mapred.local.dir");
35
36 /** Return the path to local map output file created earlier
37 * @param mapTaskId a map task id
38 */
39 public Path getOutputFile(String mapTaskId)
40 throws IOException {
41 return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
42 }
43
44 /** Create a local map output file name.
45 * @param mapTaskId a map task id
46 * @param size the size of the file
47 */
48 public Path getOutputFileForWrite(String mapTaskId, long size)
49 throws IOException {
50 return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
51 }
52
53 /** Return the path to a local map output index file created earlier
54 * @param mapTaskId a map task id
55 */
56 public Path getOutputIndexFile(String mapTaskId)
57 throws IOException {
58 return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
59 }
60
61 /** Create a local map output index file name.
62 * @param mapTaskId a map task id
63 * @param size the size of the file
64 */
65 public Path getOutputIndexFileForWrite(String mapTaskId, long size)
66 throws IOException {
67 return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index",
68 size, conf);
69 }
70
71 /** Return a local map spill file created earlier.
72 * @param mapTaskId a map task id
73 * @param spillNumber the number
74 */
75 public Path getSpillFile(String mapTaskId, int spillNumber)
76 throws IOException {
77 return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
78 conf);
79 }
80
81 /** Create a local map spill file name.
82 * @param mapTaskId a map task id
83 * @param spillNumber the number
84 * @param size the size of the file
85 */
86 public Path getSpillFileForWrite(String mapTaskId, int spillNumber,
87 long size) throws IOException {
88 return lDirAlloc.getLocalPathForWrite(mapTaskId+
89 "/spill" +spillNumber+".out",
90 size, conf);
91 }
92
93 /** Return a local map spill index file created earlier
94 * @param mapTaskId a map task id
95 * @param spillNumber the number
96 */
97 public Path getSpillIndexFile(String mapTaskId, int spillNumber)
98 throws IOException {
99 return lDirAlloc.getLocalPathToRead(
100 mapTaskId+"/spill" +spillNumber+".out.index", conf);
101 }
102
103 /** Create a local map spill index file name.
104 * @param mapTaskId a map task id
105 * @param spillNumber the number
106 * @param size the size of the file
107 */
108 public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
109 long size) throws IOException {
110 return lDirAlloc.getLocalPathForWrite(
111 mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
112 }
113
114 /** Return a local reduce input file created earlier
115 * @param mapTaskId a map task id
116 * @param reduceTaskId a reduce task id
117 */
118 public Path getInputFile(int mapId, String reduceTaskId)
119 throws IOException {
120 // TODO *oom* should use a format here
121 return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
122 conf);
123 }
124
125 /** Create a local reduce input file name.
126 * @param mapTaskId a map task id
127 * @param reduceTaskId a reduce task id
128 * @param size the size of the file
129 */
130 public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
131 throws IOException {
132 // TODO *oom* should use a format here
133 return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
134 size, conf);
135 }
136
137 /** Removes all of the files related to a task. */
138 public void removeAll(String taskId) throws IOException {
139 conf.deleteLocalFiles(taskId);
140 }
141
142 /**
143 * Removes all contents of temporary storage. Called upon
144 * startup, to remove any leftovers from previous run.
145 */
146 public void cleanupStorage() throws IOException {
147 conf.deleteLocalFiles();
148 }
149
150 public void setConf(Configuration conf) {
151 if (conf instanceof JobConf) {
152 this.conf = (JobConf) conf;
153 } else {
154 this.conf = new JobConf(conf);
155 }
156 }
157 }