1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.fs;
20
21 import java.io;
22 import java.util;
23
24 import org.apache.commons.logging;
25
26 import org.apache.hadoop.util;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
30 import org.apache.hadoop.conf.Configuration;
31
32 /** An implementation of a round-robin scheme for disk allocation for creating
33 * files. The way it works is that it is kept track what disk was last
34 * allocated for a file write. For the current request, the next disk from
35 * the set of disks would be allocated if the free space on the disk is
36 * sufficient enough to accomodate the file that is being considered for
37 * creation. If the space requirements cannot be met, the next disk in order
38 * would be tried and so on till a disk is found with sufficient capacity.
39 * Once a disk with sufficient space is identified, a check is done to make
40 * sure that the disk is writable. Also, there is an API provided that doesn't
41 * take the space requirements into consideration but just checks whether the
42 * disk under consideration is writable (this should be used for cases where
43 * the file size is not known apriori). An API is provided to read a path that
44 * was created earlier. That API works by doing a scan of all the disks for the
45 * input pathname.
46 * This implementation also provides the functionality of having multiple
47 * allocators per JVM (one for each unique functionality or context, like
48 * mapred, dfs-client, etc.). It ensures that there is only one instance of
49 * an allocator per context per JVM.
50 * Note:
51 * 1. The contexts referred above are actually the configuration items defined
52 * in the Configuration class like "mapred.local.dir" (for which we want to
53 * control the dir allocations). The context-strings are exactly those
54 * configuration items.
55 * 2. This implementation does not take into consideration cases where
56 * a disk becomes read-only or goes out of space while a file is being written
57 * to (disks are shared between multiple processes, and so the latter situation
58 * is probable).
59 * 3. In the class implementation, "Disk" is referred to as "Dir", which
60 * actually points to the configured directory on the Disk which will be the
61 * parent for all file write/read allocations.
62 */
63 public class LocalDirAllocator {
64
65 //A Map from the config item names like "mapred.local.dir",
66 //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
67 //is a static object to make sure there exists exactly one instance per JVM
68 private static Map <String, AllocatorPerContext> contexts =
69 new TreeMap<String, AllocatorPerContext>();
70 private String contextCfgItemName;
71
72 /**Create an allocator object
73 * @param contextCfgItemName
74 */
75 public LocalDirAllocator(String contextCfgItemName) {
76 this.contextCfgItemName = contextCfgItemName;
77 }
78
79 /** This method must be used to obtain the dir allocation context for a
80 * particular value of the context name. The context name must be an item
81 * defined in the Configuration object for which we want to control the
82 * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
83 * create a context for that name if it doesn't already exist.
84 */
85 private AllocatorPerContext obtainContext(String contextCfgItemName) {
86 synchronized (contexts) {
87 AllocatorPerContext l = contexts.get(contextCfgItemName);
88 if (l == null) {
89 contexts.put(contextCfgItemName,
90 (l = new AllocatorPerContext(contextCfgItemName)));
91 }
92 return l;
93 }
94 }
95
96 /** Get a path from the local FS. This method should be used if the size of
97 * the file is not known apriori. We go round-robin over the set of disks
98 * (via the configured dirs) and return the first complete path where
99 * we could create the parent directory of the passed path.
100 * @param pathStr the requested path (this will be created on the first
101 * available disk)
102 * @param conf the Configuration object
103 * @return the complete path to the file on a local disk
104 * @throws IOException
105 */
106 public Path getLocalPathForWrite(String pathStr,
107 Configuration conf) throws IOException {
108 return getLocalPathForWrite(pathStr, -1, conf);
109 }
110
111 /** Get a path from the local FS. Pass size as -1 if not known apriori. We
112 * round-robin over the set of disks (via the configured dirs) and return
113 * the first complete path which has enough space
114 * @param pathStr the requested path (this will be created on the first
115 * available disk)
116 * @param size the size of the file that is going to be written
117 * @param conf the Configuration object
118 * @return the complete path to the file on a local disk
119 * @throws IOException
120 */
121 public Path getLocalPathForWrite(String pathStr, long size,
122 Configuration conf) throws IOException {
123 AllocatorPerContext context = obtainContext(contextCfgItemName);
124 return context.getLocalPathForWrite(pathStr, size, conf);
125 }
126
127 /** Get a path from the local FS for reading. We search through all the
128 * configured dirs for the file's existence and return the complete
129 * path to the file when we find one
130 * @param pathStr the requested file (this will be searched)
131 * @param conf the Configuration object
132 * @return the complete path to the file on a local disk
133 * @throws IOException
134 */
135 public Path getLocalPathToRead(String pathStr,
136 Configuration conf) throws IOException {
137 AllocatorPerContext context = obtainContext(contextCfgItemName);
138 return context.getLocalPathToRead(pathStr, conf);
139 }
140
141 /** Creates a temporary file in the local FS. Pass size as -1 if not known
142 * apriori. We round-robin over the set of disks (via the configured dirs)
143 * and select the first complete path which has enough space. A file is
144 * created on this directory. The file is guaranteed to go away when the
145 * JVM exits.
146 * @param pathStr prefix for the temporary file
147 * @param size the size of the file that is going to be written
148 * @param conf the Configuration object
149 * @return a unique temporary file
150 * @throws IOException
151 */
152 public File createTmpFileForWrite(String pathStr, long size,
153 Configuration conf) throws IOException {
154 AllocatorPerContext context = obtainContext(contextCfgItemName);
155 return context.createTmpFileForWrite(pathStr, size, conf);
156 }
157
158 /** Method to check whether a context is valid
159 * @param contextCfgItemName
160 * @return true/false
161 */
162 public static boolean isContextValid(String contextCfgItemName) {
163 synchronized (contexts) {
164 return contexts.containsKey(contextCfgItemName);
165 }
166 }
167
168 private static class AllocatorPerContext {
169
170 private final Log LOG =
171 LogFactory.getLog("org.apache.hadoop.fs.AllocatorPerContext");
172
173 private int dirNumLastAccessed;
174 private FileSystem localFS;
175 private DF[] dirDF;
176 private String contextCfgItemName;
177 private String[] localDirs;
178 private String savedLocalDirs = "";
179
180 public AllocatorPerContext(String contextCfgItemName) {
181 this.contextCfgItemName = contextCfgItemName;
182 }
183
184 /** This method gets called everytime before any read/write to make sure
185 * that any change to localDirs is reflected immediately.
186 */
187 private void confChanged(Configuration conf) throws IOException {
188 String newLocalDirs = conf.get(contextCfgItemName);
189 if (!newLocalDirs.equals(savedLocalDirs)) {
190 localDirs = conf.getStrings(contextCfgItemName);
191 localFS = FileSystem.getLocal(conf);
192 int numDirs = localDirs.length;
193 dirDF = new DF[numDirs];
194 for (int i = 0; i < numDirs; i++) {
195 try {
196 localFS.mkdirs(new Path(localDirs[i]));
197 } catch (IOException ie) { } //ignore
198 dirDF[i] = new DF(new File(localDirs[i]), 30000);
199 }
200 dirNumLastAccessed = 0;
201 savedLocalDirs = newLocalDirs;
202 }
203 }
204
205 private Path createPath(String path) throws IOException {
206 Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
207 path);
208 //check whether we are able to create a directory here. If the disk
209 //happens to be RDONLY we will fail
210 try {
211 DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
212 return file;
213 } catch (DiskErrorException d) {
214 LOG.warn(StringUtils.stringifyException(d));
215 return null;
216 }
217 }
218
219 /** Get a path from the local FS. This method should be used if the size of
220 * the file is not known apriori. We go round-robin over the set of disks
221 * (via the configured dirs) and return the first complete path where
222 * we could create the parent directory of the passed path.
223 */
224 public synchronized Path getLocalPathForWrite(String path,
225 Configuration conf) throws IOException {
226 return getLocalPathForWrite(path, -1, conf);
227 }
228
229 /** Get a path from the local FS. Pass size as -1 if not known apriori. We
230 * round-robin over the set of disks (via the configured dirs) and return
231 * the first complete path which has enough space
232 */
233 public synchronized Path getLocalPathForWrite(String pathStr, long size,
234 Configuration conf) throws IOException {
235 confChanged(conf);
236 int numDirs = localDirs.length;
237 int numDirsSearched = 0;
238 //remove the leading slash from the path (to make sure that the uri
239 //resolution results in a valid path on the dir being checked)
240 if (pathStr.startsWith("/")) {
241 pathStr = pathStr.substring(1);
242 }
243 Path returnPath = null;
244 while (numDirsSearched < numDirs && returnPath == null) {
245 if (size >= 0) {
246 long capacity = dirDF[dirNumLastAccessed].getAvailable();
247 if (capacity > size) {
248 returnPath = createPath(pathStr);
249 }
250 } else {
251 returnPath = createPath(pathStr);
252 }
253 dirNumLastAccessed++;
254 dirNumLastAccessed = dirNumLastAccessed % numDirs;
255 numDirsSearched++;
256 }
257
258 if (returnPath != null) {
259 return returnPath;
260 }
261
262 //no path found
263 throw new DiskErrorException("Could not find any valid local " +
264 "directory for " + pathStr);
265 }
266
267 /** Creates a file on the local FS. Pass size as -1 if not known apriori. We
268 * round-robin over the set of disks (via the configured dirs) and return
269 * a file on the first path which has enough space. The file is guaranteed
270 * to go away when the JVM exits.
271 */
272 public File createTmpFileForWrite(String pathStr, long size,
273 Configuration conf) throws IOException {
274
275 // find an appropriate directory
276 Path path = getLocalPathForWrite(pathStr, size, conf);
277 File dir = new File(path.getParent().toUri().getPath());
278 String prefix = path.getName();
279
280 // create a temp file on this directory
281 File result = File.createTempFile(prefix, null, dir);
282 result.deleteOnExit();
283 return result;
284 }
285
286 /** Get a path from the local FS for reading. We search through all the
287 * configured dirs for the file's existence and return the complete
288 * path to the file when we find one
289 */
290 public synchronized Path getLocalPathToRead(String pathStr,
291 Configuration conf) throws IOException {
292 confChanged(conf);
293 int numDirs = localDirs.length;
294 int numDirsSearched = 0;
295 //remove the leading slash from the path (to make sure that the uri
296 //resolution results in a valid path on the dir being checked)
297 if (pathStr.startsWith("/")) {
298 pathStr = pathStr.substring(1);
299 }
300 while (numDirsSearched < numDirs) {
301 Path file = new Path(localDirs[numDirsSearched], pathStr);
302 if (localFS.exists(file)) {
303 return file;
304 }
305 numDirsSearched++;
306 }
307
308 //no path found
309 throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
310 " the configured local directories");
311 }
312 }
313 }