1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.dfs;
19
20 import org.apache.commons.logging;
21
22 import org.apache.hadoop.fs.Path;
23 import org.apache.hadoop.ipc;
24 import org.apache.hadoop.conf;
25 import org.apache.hadoop.metrics.MetricsUtil;
26 import org.apache.hadoop.net.DNS;
27 import org.apache.hadoop.net.NodeBase;
28 import org.apache.hadoop.util;
29 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
30 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
31 import org.apache.hadoop.mapred.StatusHttpServer;
32 import org.apache.hadoop.net.NetworkTopology;
33 import org.apache.hadoop.dfs.BlockCommand;
34 import org.apache.hadoop.dfs.DatanodeProtocol;
35 import org.apache.hadoop.fs.FileUtil;
36
37 import java.io;
38 import java.net;
39 import java.util;
40 import org.apache.hadoop.metrics.MetricsContext;
41 import org.apache.hadoop.metrics.MetricsRecord;
42 import org.apache.hadoop.metrics.Updater;
43 import org.apache.hadoop.metrics.jvm.JvmMetrics;
44
45 /**********************************************************
46 * DataNode is a class (and program) that stores a set of
47 * blocks for a DFS deployment. A single deployment can
48 * have one or many DataNodes. Each DataNode communicates
49 * regularly with a single NameNode. It also communicates
50 * with client code and other DataNodes from time to time.
51 *
52 * DataNodes store a series of named blocks. The DataNode
53 * allows client code to read these blocks, or to write new
54 * block data. The DataNode may also, in response to instructions
55 * from its NameNode, delete blocks or copy blocks to/from other
56 * DataNodes.
57 *
58 * The DataNode maintains just one critical table:
59 * block-> stream of bytes (of BLOCK_SIZE or less)
60 *
61 * This info is stored on a local disk. The DataNode
62 * reports the table's contents to the NameNode upon startup
63 * and every so often afterwards.
64 *
65 * DataNodes spend their lives in an endless loop of asking
66 * the NameNode for something to do. A NameNode cannot connect
67 * to a DataNode directly; a NameNode simply returns values from
68 * functions invoked by a DataNode.
69 *
70 * DataNodes maintain an open server socket so that client code
71 * or other DataNodes can read/write data. The host/port for
72 * this server is reported to the NameNode, which then sends that
73 * information to clients or other DataNodes that might be interested.
74 *
75 **********************************************************/
76 public class DataNode implements FSConstants, Runnable {
77 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
78
79 /**
80 * Util method to build socket addr from either:
81 * <host>:<post>
82 * <fs>://<host>:<port>/<path>
83 */
84 public static InetSocketAddress createSocketAddr(String target
85 ) throws IOException {
86 int colonIndex = target.indexOf(':');
87 if (colonIndex < 0) {
88 throw new RuntimeException("Not a host:port pair: " + target);
89 }
90 String hostname;
91 int port;
92 if (!target.contains("/")) {
93 // must be the old style <host>:<port>
94 hostname = target.substring(0, colonIndex);
95 port = Integer.parseInt(target.substring(colonIndex + 1));
96 } else {
97 // a new uri
98 URI addr = new Path(target).toUri();
99 hostname = addr.getHost();
100 port = addr.getPort();
101 }
102
103 return new InetSocketAddress(hostname, port);
104 }
105
106 DatanodeProtocol namenode = null;
107 FSDataset data = null;
108 DatanodeRegistration dnRegistration = null;
109 private String networkLoc;
110 volatile boolean shouldRun = true;
111 LinkedList<Block> receivedBlockList = new LinkedList<Block>();
112 int xmitsInProgress = 0;
113 Daemon dataXceiveServer = null;
114 long blockReportInterval;
115 long lastBlockReport = 0;
116 long lastHeartbeat = 0;
117 long heartBeatInterval;
118 private DataStorage storage = null;
119 private StatusHttpServer infoServer = null;
120 private DataNodeMetrics myMetrics;
121 private static InetSocketAddress nameNodeAddr;
122 private static DataNode datanodeObject = null;
123 private static Thread dataNodeThread = null;
124 String machineName;
125 int defaultBytesPerChecksum = 512;
126
127 private static class DataNodeMetrics implements Updater {
128 private final MetricsRecord metricsRecord;
129 private int bytesWritten = 0;
130 private int bytesRead = 0;
131 private int blocksWritten = 0;
132 private int blocksRead = 0;
133 private int blocksReplicated = 0;
134 private int blocksRemoved = 0;
135
136 DataNodeMetrics(Configuration conf) {
137 String sessionId = conf.get("session.id");
138 // Initiate reporting of Java VM metrics
139 JvmMetrics.init("DataNode", sessionId);
140 // Create record for DataNode metrics
141 MetricsContext context = MetricsUtil.getContext("dfs");
142 metricsRecord = MetricsUtil.createRecord(context, "datanode");
143 metricsRecord.setTag("sessionId", sessionId);
144 context.registerUpdater(this);
145 }
146
147 /**
148 * Since this object is a registered updater, this method will be called
149 * periodically, e.g. every 5 seconds.
150 */
151 public void doUpdates(MetricsContext unused) {
152 synchronized (this) {
153 metricsRecord.incrMetric("bytes_read", bytesRead);
154 metricsRecord.incrMetric("bytes_written", bytesWritten);
155 metricsRecord.incrMetric("blocks_read", blocksRead);
156 metricsRecord.incrMetric("blocks_written", blocksWritten);
157 metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
158 metricsRecord.incrMetric("blocks_removed", blocksRemoved);
159
160 bytesWritten = 0;
161 bytesRead = 0;
162 blocksWritten = 0;
163 blocksRead = 0;
164 blocksReplicated = 0;
165 blocksRemoved = 0;
166 }
167 metricsRecord.update();
168 }
169
170 synchronized void readBytes(int nbytes) {
171 bytesRead += nbytes;
172 }
173
174 synchronized void wroteBytes(int nbytes) {
175 bytesWritten += nbytes;
176 }
177
178 synchronized void readBlocks(int nblocks) {
179 blocksRead += nblocks;
180 }
181
182 synchronized void wroteBlocks(int nblocks) {
183 blocksWritten += nblocks;
184 }
185
186 synchronized void replicatedBlocks(int nblocks) {
187 blocksReplicated += nblocks;
188 }
189
190 synchronized void removedBlocks(int nblocks) {
191 blocksRemoved += nblocks;
192 }
193 }
194
195 /**
196 * Create the DataNode given a configuration and an array of dataDirs.
197 * 'dataDirs' is where the blocks are stored.
198 */
199 DataNode(Configuration conf,
200 AbstractList<File> dataDirs) throws IOException {
201
202 myMetrics = new DataNodeMetrics(conf);
203 datanodeObject = this;
204
205 try {
206 startDataNode(conf, dataDirs);
207 } catch (IOException ie) {
208 shutdown();
209 throw ie;
210 }
211 }
212
213 void startDataNode(Configuration conf,
214 AbstractList<File> dataDirs
215 ) throws IOException {
216 // use configured nameserver & interface to get local hostname
217 machineName = DNS.getDefaultHost(
218 conf.get("dfs.datanode.dns.interface","default"),
219 conf.get("dfs.datanode.dns.nameserver","default"));
220 InetSocketAddress nameNodeAddr = createSocketAddr(
221 conf.get("fs.default.name", "local"));
222
223 this.defaultBytesPerChecksum =
224 Math.max(conf.getInt("io.bytes.per.checksum", 512), 1);
225
226 int tmpPort = conf.getInt("dfs.datanode.port", 50010);
227 storage = new DataStorage();
228 // construct registration
229 this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
230
231 // connect to name node
232 this.namenode = (DatanodeProtocol)
233 RPC.waitForProxy(DatanodeProtocol.class,
234 DatanodeProtocol.versionID,
235 nameNodeAddr,
236 conf);
237 // get version and id info from the name-node
238 NamespaceInfo nsInfo = handshake();
239
240 // read storage info, lock data dirs and transition fs state if necessary
241 StartupOption startOpt = getStartupOption(conf);
242 assert startOpt != null : "Startup option must be set.";
243 storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
244 // adjust
245 this.dnRegistration.setStorageInfo(storage);
246
247 // initialize data node internal structure
248 this.data = new FSDataset(storage, conf);
249
250 // find free port
251 ServerSocket ss = null;
252 String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
253 while (ss == null) {
254 try {
255 ss = new ServerSocket(tmpPort, 0, InetAddress.getByName(bindAddress));
256 LOG.info("Opened server at " + tmpPort);
257 } catch (IOException ie) {
258 LOG.info("Could not open server at " + tmpPort + ", trying new port");
259 tmpPort++;
260 }
261 }
262 // adjust machine name with the actual port
263 this.dnRegistration.setName(machineName + ":" + tmpPort);
264
265 this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
266
267 long blockReportIntervalBasis =
268 conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
269 this.blockReportInterval =
270 blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
271 this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
272 DataNode.nameNodeAddr = nameNodeAddr;
273
274 //create a servlet to serve full-file content
275 int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
276 String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
277 this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
278 this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
279 this.infoServer.start();
280 // adjust info port
281 this.dnRegistration.setInfoPort(this.infoServer.getPort());
282 // get network location
283 this.networkLoc = conf.get("dfs.datanode.rack");
284 if (networkLoc == null) // exec network script or set the default rack
285 networkLoc = getNetworkLoc(conf);
286 // register datanode
287 register();
288 }
289
290 private NamespaceInfo handshake() throws IOException {
291 NamespaceInfo nsInfo = new NamespaceInfo();
292 while (shouldRun) {
293 try {
294 nsInfo = namenode.versionRequest();
295 break;
296 } catch(SocketTimeoutException e) { // namenode is busy
297 LOG.info("Problem connecting to server: " + getNameNodeAddr());
298 try {
299 Thread.sleep(1000);
300 } catch (InterruptedException ie) {}
301 }
302 }
303 String errorMsg = null;
304 // verify build version
305 if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
306 errorMsg = "Incompatible build versions: namenode BV = "
307 + nsInfo.getBuildVersion() + "; datanode BV = "
308 + Storage.getBuildVersion();
309 LOG.fatal( errorMsg );
310 try {
311 namenode.errorReport( dnRegistration,
312 DatanodeProtocol.NOTIFY, errorMsg );
313 } catch( SocketTimeoutException e ) { // namenode is busy
314 LOG.info("Problem connecting to server: " + getNameNodeAddr());
315 }
316 throw new IOException( errorMsg );
317 }
318 assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
319 "Data-node and name-node layout versions must be the same.";
320 return nsInfo;
321 }
322
323 /** Return the DataNode object
324 *
325 */
326 public static DataNode getDataNode() {
327 return datanodeObject;
328 }
329
330 public InetSocketAddress getNameNodeAddr() {
331 return nameNodeAddr;
332 }
333
334 /**
335 * Return the namenode's identifier
336 */
337 public String getNamenode() {
338 //return namenode.toString();
339 return "<namenode>";
340 }
341
342 /**
343 * Register datanode
344 * <p>
345 * The datanode needs to register with the namenode on startup in order
346 * 1) to report which storage it is serving now and
347 * 2) to receive a registrationID
348 * issued by the namenode to recognize registered datanodes.
349 *
350 * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
351 * @throws IOException
352 */
353 private void register() throws IOException {
354 while(shouldRun) {
355 try {
356 // reset name to machineName. Mainly for web interface.
357 dnRegistration.name = machineName + ":" + dnRegistration.getPort();
358 dnRegistration = namenode.register(dnRegistration, networkLoc);
359 break;
360 } catch(SocketTimeoutException e) { // namenode is busy
361 LOG.info("Problem connecting to server: " + getNameNodeAddr());
362 try {
363 Thread.sleep(1000);
364 } catch (InterruptedException ie) {}
365 }
366 }
367 assert ("".equals(storage.getStorageID())
368 && !"".equals(dnRegistration.getStorageID()))
369 || storage.getStorageID().equals(dnRegistration.getStorageID()) :
370 "New storageID can be assigned only if data-node is not formatted";
371 if (storage.getStorageID().equals("")) {
372 storage.setStorageID(dnRegistration.getStorageID());
373 storage.writeAll();
374 LOG.info("New storage id " + dnRegistration.getStorageID()
375 + " is assigned to data-node " + dnRegistration.getName());
376 }
377 if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
378 throw new IOException("Inconsistent storage IDs. Name-node returned "
379 + dnRegistration.getStorageID()
380 + ". Expecting " + storage.getStorageID());
381 }
382 }
383
384 /**
385 * Shut down this instance of the datanode.
386 * Returns only after shutdown is complete.
387 */
388 public void shutdown() {
389 if (infoServer != null) {
390 try {
391 infoServer.stop();
392 } catch (Exception e) {
393 }
394 }
395 this.shouldRun = false;
396 if (dataXceiveServer != null) {
397 ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
398 this.dataXceiveServer.interrupt();
399 }
400 if(upgradeManager != null)
401 upgradeManager.shutdownUpgrade();
402 if (storage != null) {
403 try {
404 this.storage.unlockAll();
405 } catch (IOException ie) {
406 }
407 }
408 if (dataNodeThread != null) {
409 dataNodeThread.interrupt();
410 try {
411 dataNodeThread.join();
412 } catch (InterruptedException ie) {
413 }
414 }
415 }
416
417
418 /* Check if there is no space in disk or the disk is read-only
419 * when IOException occurs.
420 * If so, handle the error */
421 private void checkDiskError( IOException e ) throws IOException {
422 if (e.getMessage().startsWith("No space left on device")) {
423 throw new DiskOutOfSpaceException("No space left on device");
424 } else {
425 checkDiskError();
426 }
427 }
428
429 /* Check if there is no disk space and if so, handle the error*/
430 private void checkDiskError( ) throws IOException {
431 try {
432 data.checkDataDir();
433 } catch(DiskErrorException de) {
434 handleDiskError(de.getMessage());
435 }
436 }
437
438 private void handleDiskError(String errMsgr) {
439 LOG.warn("DataNode is shutting down.\n" + errMsgr);
440 try {
441 namenode.errorReport(
442 dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
443 } catch(IOException ignored) {
444 }
445 shutdown();
446 }
447
448 private static class Count {
449 int value = 0;
450 Count(int init) { value = init; }
451 synchronized void incr() { value++; }
452 synchronized void decr() { value--; }
453 public String toString() { return Integer.toString(value); }
454 public int getValue() { return value; }
455 }
456
457 Count xceiverCount = new Count(0);
458
459 /**
460 * Main loop for the DataNode. Runs until shutdown,
461 * forever calling remote NameNode functions.
462 */
463 public void offerService() throws Exception {
464
465 LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
466
467 //
468 // Now loop for a long time....
469 //
470
471 while (shouldRun) {
472 try {
473 long now = System.currentTimeMillis();
474
475 //
476 // Every so often, send heartbeat or block-report
477 //
478 if (now - lastHeartbeat > heartBeatInterval) {
479 //
480 // All heartbeat messages include following info:
481 // -- Datanode name
482 // -- data transfer port
483 // -- Total capacity
484 // -- Bytes remaining
485 //
486 DatanodeCommand cmd = namenode.sendHeartbeat(dnRegistration,
487 data.getCapacity(),
488 data.getRemaining(),
489 xmitsInProgress,
490 xceiverCount.getValue());
491 //LOG.info("Just sent heartbeat, with name " + localName);
492 lastHeartbeat = now;
493 if (!processCommand(cmd))
494 continue;
495 }
496
497 // check if there are newly received blocks
498 Block [] blockArray=null;
499 synchronized(receivedBlockList) {
500 if (receivedBlockList.size() > 0) {
501 //
502 // Send newly-received blockids to namenode
503 //
504 blockArray = receivedBlockList.toArray(new Block[receivedBlockList.size()]);
505 }
506 }
507 if (blockArray != null) {
508 namenode.blockReceived(dnRegistration, blockArray);
509 synchronized (receivedBlockList) {
510 for(Block b: blockArray) {
511 receivedBlockList.remove(b);
512 }
513 }
514 }
515
516 // send block report
517 if (now - lastBlockReport > blockReportInterval) {
518 //
519 // Send latest blockinfo report if timer has expired.
520 // Get back a list of local block(s) that are obsolete
521 // and can be safely GC'ed.
522 //
523 DatanodeCommand cmd = namenode.blockReport(dnRegistration,
524 data.getBlockReport());
525 //
526 // If we have sent the first block report, then wait a random
527 // time before we start the periodic block reports.
528 //
529 if (lastBlockReport == 0) {
530 lastBlockReport = now - new Random().nextInt((int)(blockReportInterval));
531 } else {
532 lastBlockReport = now;
533 }
534 processCommand(cmd);
535 }
536
537 //
538 // There is no work to do; sleep until hearbeat timer elapses,
539 // or work arrives, and then iterate again.
540 //
541 long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
542 synchronized(receivedBlockList) {
543 if (waitTime > 0 && receivedBlockList.size() == 0) {
544 try {
545 receivedBlockList.wait(waitTime);
546 } catch (InterruptedException ie) {
547 }
548 }
549 } // synchronized
550 } catch(RemoteException re) {
551 String reClass = re.getClassName();
552 if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
553 DisallowedDatanodeException.class.getName().equals(reClass)) {
554 LOG.warn("DataNode is shutting down: " +
555 StringUtils.stringifyException(re));
556 shutdown();
557 return;
558 }
559 LOG.warn(StringUtils.stringifyException(re));
560 } catch (IOException e) {
561 LOG.warn(StringUtils.stringifyException(e));
562 }
563 } // while (shouldRun)
564 } // offerService
565
566 /**
567 *
568 * @param cmd
569 * @return true if further processing may be required or false otherwise.
570 * @throws IOException
571 */
572 private boolean processCommand(DatanodeCommand cmd) throws IOException {
573 if (cmd == null)
574 return true;
575 switch(cmd.getAction()) {
576 case DatanodeProtocol.DNA_TRANSFER:
577 //
578 // Send a copy of a block to another datanode
579 //
580 BlockCommand bcmd = (BlockCommand)cmd;
581 transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
582 break;
583 case DatanodeProtocol.DNA_INVALIDATE:
584 //
585 // Some local block(s) are obsolete and can be
586 // safely garbage-collected.
587 //
588 Block toDelete[] = ((BlockCommand)cmd).getBlocks();
589 try {
590 data.invalidate(toDelete);
591 } catch(IOException e) {
592 checkDiskError();
593 throw e;
594 }
595 myMetrics.removedBlocks(toDelete.length);
596 break;
597 case DatanodeProtocol.DNA_SHUTDOWN:
598 // shut down the data node
599 this.shutdown();
600 return false;
601 case DatanodeProtocol.DNA_REGISTER:
602 // namenode requested a registration
603 register();
604 lastHeartbeat=0;
605 lastBlockReport=0;
606 break;
607 case DatanodeProtocol.DNA_FINALIZE:
608 storage.finalizeUpgrade();
609 break;
610 case UpgradeCommand.UC_ACTION_START_UPGRADE:
611 // start distributed upgrade here
612 processDistributedUpgradeCommand((UpgradeCommand)cmd);
613 break;
614 default:
615 LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
616 }
617 return true;
618 }
619
620 // Distributed upgrade manager
621 UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
622
623 private void processDistributedUpgradeCommand(UpgradeCommand comm
624 ) throws IOException {
625 assert upgradeManager != null : "DataNode.upgradeManager is null.";
626 upgradeManager.processUpgradeCommand(comm);
627 }
628
629
630 /**
631 * Start distributed upgrade if it should be initiated by the data-node.
632 */
633 private void startDistributedUpgradeIfNeeded() throws IOException {
634 UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
635 assert um != null : "DataNode.upgradeManager is null.";
636 if(!um.getUpgradeState())
637 return;
638 um.setUpgradeState(false, um.getUpgradeVersion());
639 um.startUpgrade();
640 return;
641 }
642 private void transferBlocks( Block blocks[],
643 DatanodeInfo xferTargets[][]
644 ) throws IOException {
645 for (int i = 0; i < blocks.length; i++) {
646 if (!data.isValidBlock(blocks[i])) {
647 String errStr = "Can't send invalid block " + blocks[i];
648 LOG.info(errStr);
649 namenode.errorReport(dnRegistration,
650 DatanodeProtocol.INVALID_BLOCK,
651 errStr);
652 break;
653 }
654 if (xferTargets[i].length > 0) {
655 LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
656 new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
657 }
658 }
659 }
660
661 /**
662 * Server used for receiving/sending a block of data.
663 * This is created to listen for requests from clients or
664 * other DataNodes. This small server does not use the
665 * Hadoop IPC mechanism.
666 */
667 class DataXceiveServer implements Runnable {
668 ServerSocket ss;
669 public DataXceiveServer(ServerSocket ss) {
670 this.ss = ss;
671 }
672
673 /**
674 */
675 public void run() {
676 try {
677 while (shouldRun) {
678 Socket s = ss.accept();
679 //s.setSoTimeout(READ_TIMEOUT);
680 xceiverCount.incr();
681 new Daemon(new DataXceiver(s)).start();
682 }
683 ss.close();
684 } catch (IOException ie) {
685 LOG.info("Exiting DataXceiveServer due to " + ie.toString());
686 }
687 }
688 public void kill() {
689 assert shouldRun == false :
690 "shoudRun should be set to false before killing";
691 try {
692 this.ss.close();
693 } catch (IOException iex) {
694 }
695 }
696 }
697
698 /**
699 * Thread for processing incoming/outgoing data stream
700 */
701 class DataXceiver implements Runnable {
702 Socket s;
703 public DataXceiver(Socket s) {
704 this.s = s;
705 LOG.debug("Number of active connections is: "+xceiverCount);
706 }
707
708 /**
709 * Read/write data from/to the DataXceiveServer.
710 */
711 public void run() {
712 try {
713 DataInputStream in = new DataInputStream(
714 new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
715 short version = in.readShort();
716 if ( version != DATA_TRANFER_VERSION ) {
717 throw new IOException( "Version Mismatch" );
718 }
719
720 byte op = in.readByte();
721
722 switch ( op ) {
723 case OP_READ_BLOCK:
724 readBlock( in );
725 break;
726 case OP_WRITE_BLOCK:
727 writeBlock( in );
728 break;
729 case OP_READ_METADATA:
730 readMetadata( in );
731 break;
732 default:
733 throw new IOException("Unknown opcode " + op + "in data stream");
734 }
735 } catch (Throwable t) {
736 LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
737 } finally {
738 try {
739 xceiverCount.decr();
740 LOG.debug("Number of active connections is: "+xceiverCount);
741 s.close();
742 } catch (IOException ie2) {
743 }
744 }
745 }
746
747 /**
748 * Read a block from the disk
749 * @param in The stream to read from
750 * @throws IOException
751 */
752 private void readBlock(DataInputStream in) throws IOException {
753 //
754 // Read in the header
755 //
756 long blockId = in.readLong();
757 Block block = new Block( blockId, 0 );
758
759 long startOffset = in.readLong();
760 long length = in.readLong();
761
762 try {
763 //XXX Buffered output stream?
764 long read = sendBlock(s, block, startOffset, length, null );
765 myMetrics.readBytes((int)read);
766 myMetrics.readBlocks(1);
767 LOG.info("Served block " + block + " to " + s.getInetAddress());
768 } catch ( SocketException ignored ) {
769 // Its ok for remote side to close the connection anytime.
770 myMetrics.readBlocks(1);
771 } catch ( IOException ioe ) {
772 /* What exactly should we do here?
773 * Earlier version shutdown() datanode if there is disk error.
774 */
775 LOG.warn( "Got exception while serving " + block + " to " +
776 s.getInetAddress() + ": " +
777 StringUtils.stringifyException(ioe) );
778 throw ioe;
779 }
780 }
781
782 /**
783 * Write a block to disk.
784 * @param in The stream to read from
785 * @throws IOException
786 */
787 private void writeBlock(DataInputStream in) throws IOException {
788 //
789 // Read in the header
790 //
791 DataOutputStream reply = new DataOutputStream(s.getOutputStream());
792 DataOutputStream out = null;
793 DataOutputStream checksumOut = null;
794 Socket mirrorSock = null;
795 DataOutputStream mirrorOut = null;
796 DataInputStream mirrorIn = null;
797
798 try {
799 /* We need an estimate for block size to check if the
800 * disk partition has enough space. For now we just increment
801 * FSDataset.reserved by configured dfs.block.size
802 * Other alternative is to include the block size in the header
803 * sent by DFSClient.
804 */
805 Block block = new Block( in.readLong(), 0 );
806 int numTargets = in.readInt();
807 if ( numTargets < 0 ) {
808 throw new IOException("Mislabelled incoming datastream.");
809 }
810 DatanodeInfo targets[] = new DatanodeInfo[numTargets];
811 for (int i = 0; i < targets.length; i++) {
812 DatanodeInfo tmp = new DatanodeInfo();
813 tmp.readFields(in);
814 targets[i] = tmp;
815 }
816
817 DataChecksum checksum = DataChecksum.newDataChecksum( in );
818
819 //
820 // Open local disk out
821 //
822 FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
823 out = new DataOutputStream(
824 new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));
825 checksumOut = new DataOutputStream(
826 new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
827
828 InetSocketAddress mirrorTarget = null;
829 String mirrorNode = null;
830 //
831 // Open network conn to backup machine, if
832 // appropriate
833 //
834 if (targets.length > 0) {
835 // Connect to backup machine
836 mirrorNode = targets[0].getName();
837 mirrorTarget = createSocketAddr(mirrorNode);
838 try {
839 mirrorSock = new Socket();
840 mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
841 mirrorSock.setSoTimeout(READ_TIMEOUT);
842 mirrorOut = new DataOutputStream(
843 new BufferedOutputStream(mirrorSock.getOutputStream(),
844 BUFFER_SIZE));
845 mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
846 //Copied from DFSClient.java!
847 mirrorOut.writeShort( DATA_TRANFER_VERSION );
848 mirrorOut.write( OP_WRITE_BLOCK );
849 mirrorOut.writeLong( block.getBlockId() );
850 mirrorOut.writeInt( targets.length - 1 );
851 for ( int i = 1; i < targets.length; i++ ) {
852 targets[i].write( mirrorOut );
853 }
854 checksum.writeHeader( mirrorOut );
855 myMetrics.replicatedBlocks(1);
856 } catch (IOException ie) {
857 if (mirrorOut != null) {
858 LOG.info("Exception connecting to mirror " + mirrorNode
859 + "\n" + StringUtils.stringifyException(ie));
860 mirrorOut = null;
861 }
862 }
863 }
864
865 // XXX The following code is similar on both sides...
866
867 int bytesPerChecksum = checksum.getBytesPerChecksum();
868 int checksumSize = checksum.getChecksumSize();
869 byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
870 long blockLen = 0;
871 long lastOffset = 0;
872 long lastLen = 0;
873 short status = -1;
874 boolean headerWritten = false;
875
876 while ( true ) {
877 // Read one data chunk in each loop.
878
879 long offset = lastOffset + lastLen;
880 int len = (int) in.readInt();
881 if ( len < 0 || len > bytesPerChecksum ) {
882 LOG.warn( "Got wrong length during writeBlock(" +
883 block + ") from " + s.getRemoteSocketAddress() +
884 " at offset " + offset + ": " + len +
885 " expected <= " + bytesPerChecksum );
886 status = OP_STATUS_ERROR;
887 break;
888 }
889
890 in.readFully( buf, 0, len + checksumSize );
891
892 if ( len > 0 && checksumSize > 0 ) {
893 /*
894 * Verification is not included in the initial design.
895 * For now, it at least catches some bugs. Later, we can
896 * include this after showing that it does not affect
897 * performance much.
898 */
899 checksum.update( buf, 0, len );
900
901 if ( ! checksum.compare( buf, len ) ) {
902 throw new IOException( "Unexpected checksum mismatch " +
903 "while writing " + block +
904 " from " +
905 s.getRemoteSocketAddress() );
906 }
907
908 checksum.reset();
909 }
910
911 // First write to remote node before writing locally.
912 if (mirrorOut != null) {
913 try {
914 mirrorOut.writeInt( len );
915 mirrorOut.write( buf, 0, len + checksumSize );
916 if (len == 0) {
917 mirrorOut.flush();
918 }
919 } catch (IOException ioe) {
920 LOG.info( "Exception writing to mirror " + mirrorNode +
921 "\n" + StringUtils.stringifyException(ioe) );
922 //
923 // If stream-copy fails, continue
924 // writing to disk. We shouldn't
925 // interrupt client write.
926 //
927 mirrorOut = null;
928 }
929 }
930
931 try {
932 if ( !headerWritten ) {
933 // First DATA_CHUNK.
934 // Write the header even if checksumSize is 0.
935 checksumOut.writeShort( FSDataset.METADATA_VERSION );
936 checksum.writeHeader( checksumOut );
937 headerWritten = true;
938 }
939
940 if ( len > 0 ) {
941 out.write( buf, 0, len );
942 // Write checksum
943 checksumOut.write( buf, len, checksumSize );
944 myMetrics.wroteBytes( len );
945 } else {
946 /* Should we sync() files here? It can add many millisecs of
947 * latency. We did not sync before HADOOP-1134 either.
948 */
949 out.close();
950 out = null;
951 checksumOut.close();
952 checksumOut = null;
953 }
954
955 } catch (IOException iex) {
956 checkDiskError(iex);
957 throw iex;
958 }
959
960 if ( len == 0 ) {
961
962 // We already have one successful write here. Should we
963 // wait for response from next target? We will skip for now.
964
965 block.setNumBytes( blockLen );
966
967 //Does this fsync()?
968 data.finalizeBlock( block );
969 myMetrics.wroteBlocks(1);
970
971 status = OP_STATUS_SUCCESS;
972
973 break;
974 }
975
976 if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
977 LOG.warn( "Got wrong length during writeBlock(" +
978 block + ") from " + s.getRemoteSocketAddress() +
979 " : " + " got " + lastLen + " instead of " +
980 bytesPerChecksum );
981 status = OP_STATUS_ERROR;
982 break;
983 }
984
985 lastOffset = offset;
986 lastLen = len;
987 blockLen += len;
988 }
989 // done with reading the data.
990
991 if ( status == OP_STATUS_SUCCESS ) {
992 /* Informing the name node could take a long long time!
993 Should we wait till namenode is informed before responding
994 with success to the client? For now we don't.
995 */
996 synchronized ( receivedBlockList ) {
997 receivedBlockList.add( block );
998 receivedBlockList.notifyAll();
999 }
1000
1001 String msg = "Received block " + block + " from " +
1002 s.getInetAddress();
1003
1004 if ( mirrorOut != null ) {
1005 //Wait for the remote reply
1006 mirrorOut.flush();
1007 short result = OP_STATUS_ERROR;
1008 try {
1009 result = mirrorIn.readShort();
1010 } catch ( IOException ignored ) {}
1011
1012 msg += " and " + (( result != OP_STATUS_SUCCESS ) ?
1013 "failed to mirror to " : " mirrored to ") +
1014 mirrorTarget;
1015
1016 mirrorOut = null;
1017 }
1018
1019 LOG.info(msg);
1020 }
1021
1022 if ( status >= 0 ) {
1023 try {
1024 reply.writeShort( status );
1025 reply.flush();
1026 } catch ( IOException ignored ) {}
1027 }
1028
1029 } finally {
1030 try {
1031 if ( out != null )
1032 out.close();
1033 if ( checksumOut != null )
1034 checksumOut.close();
1035 if ( mirrorSock != null )
1036 mirrorSock.close();
1037 } catch (IOException iex) {
1038 shutdown();
1039 throw iex;
1040 }
1041 }
1042 }
1043
1044 /**
1045 * Reads the metadata and sends the data in one 'DATA_CHUNK'
1046 * @param in
1047 */
1048 void readMetadata(DataInputStream in) throws IOException {
1049
1050 Block block = new Block( in.readLong(), 0 );
1051 InputStream checksumIn = null;
1052 DataOutputStream out = null;
1053
1054 try {
1055 File blockFile = data.getBlockFile( block );
1056 File checksumFile = FSDataset.getMetaFile( blockFile );
1057 checksumIn = new FileInputStream(checksumFile);
1058
1059 long fileSize = checksumFile.length();
1060 if (fileSize >= 1L<<31 || fileSize <= 0) {
1061 throw new IOException("Unexpected size for checksumFile " +
1062 checksumFile);
1063 }
1064
1065 byte [] buf = new byte[(int)fileSize];
1066 FileUtil.readFully(checksumIn, buf, 0, buf.length);
1067
1068 out = new DataOutputStream(s.getOutputStream());
1069
1070 out.writeByte(OP_STATUS_SUCCESS);
1071 out.writeInt(buf.length);
1072 out.write(buf);
1073
1074 //last DATA_CHUNK
1075 out.writeInt(0);
1076 } finally {
1077 FileUtil.closeStream(checksumIn);
1078 }
1079 }
1080 }
1081
1082 /** sendBlock() is used to read block and its metadata and stream
1083 * the data to either a client or to another datanode.
1084 * If argument targets is null, then it is assumed to be replying
1085 * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
1086 * to another datanode.
1087 *
1088 * returns total bytes reads, including crc.
1089 */
1090 long sendBlock(Socket sock, Block block,
1091 long startOffset, long length, DatanodeInfo targets[] )
1092 throws IOException {
1093 DataOutputStream out = new DataOutputStream(
1094 new BufferedOutputStream(sock.getOutputStream(),
1095 BUFFER_SIZE));
1096 RandomAccessFile blockInFile = null;
1097 DataInputStream blockIn = null;
1098 DataInputStream checksumIn = null;
1099 long totalRead = 0;
1100
1101 /* XXX This will affect inter datanode transfers during
1102 * a CRC upgrade. There should not be any replication
1103 * during crc upgrade since we are in safe mode, right?
1104 */
1105 boolean corruptChecksumOk = targets == null;
1106
1107 try {
1108 File blockFile = data.getBlockFile( block );
1109 blockInFile = new RandomAccessFile(blockFile, "r");
1110
1111 File checksumFile = FSDataset.getMetaFile( blockFile );
1112 DataChecksum checksum = null;
1113
1114 if ( !corruptChecksumOk || checksumFile.exists() ) {
1115 checksumIn = new DataInputStream(
1116 new BufferedInputStream(new FileInputStream(checksumFile),
1117 BUFFER_SIZE));
1118
1119 //read and handle the common header here. For now just a version
1120 short version = checksumIn.readShort();
1121 if ( version != FSDataset.METADATA_VERSION ) {
1122 LOG.warn( "Wrong version (" + version +
1123 ") for metadata file for " + block + " ignoring ..." );
1124 }
1125 checksum = DataChecksum.newDataChecksum( checksumIn ) ;
1126 } else {
1127 LOG.warn( "Could not find metadata file for " + block );
1128 // This only decides the buffer size. Use BUFFER_SIZE?
1129 checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
1130 16*1024 );
1131 }
1132
1133 int bytesPerChecksum = checksum.getBytesPerChecksum();
1134 int checksumSize = checksum.getChecksumSize();
1135
1136 if (length < 0) {
1137 length = data.getLength(block);
1138 }
1139
1140 long endOffset = data.getLength( block );
1141 if ( startOffset < 0 || startOffset > endOffset ||
1142 (length + startOffset) > endOffset ) {
1143 String msg = " Offset " + startOffset + " and length " + length +
1144 " don't match block " + block + " ( blockLen " +
1145 endOffset + " )";
1146 LOG.warn( "sendBlock() : " + msg );
1147 if ( targets != null ) {
1148 throw new IOException(msg);
1149 } else {
1150 out.writeShort( OP_STATUS_ERROR_INVALID );
1151 return totalRead;
1152 }
1153 }
1154
1155 byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
1156 long offset = (startOffset - (startOffset % bytesPerChecksum));
1157 if ( length >= 0 ) {
1158 // Make sure endOffset points to end of a checksumed chunk.
1159 long tmpLen = startOffset + length + (startOffset - offset);
1160 if ( tmpLen % bytesPerChecksum != 0 ) {
1161 tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
1162 }
1163 if ( tmpLen < endOffset ) {
1164 endOffset = tmpLen;
1165 }
1166 }
1167
1168 // seek to the right offsets
1169 if ( offset > 0 ) {
1170 long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
1171 blockInFile.seek(offset);
1172 if (checksumSkip > 0) {
1173 //Should we use seek() for checksum file as well?
1174 FileUtil.skipFully(checksumIn, checksumSkip);
1175 }
1176 }
1177
1178 blockIn = new DataInputStream(new BufferedInputStream(
1179 new FileInputStream(blockInFile.getFD()),
1180 BUFFER_SIZE));
1181
1182 if ( targets != null ) {
1183 //
1184 // Header info
1185 //
1186 out.writeShort( DATA_TRANFER_VERSION );
1187 out.writeByte( OP_WRITE_BLOCK );
1188 out.writeLong( block.getBlockId() );
1189 out.writeInt(targets.length-1);
1190 for (int i = 1; i < targets.length; i++) {
1191 targets[i].write( out );
1192 }
1193 } else {
1194 out.writeShort( OP_STATUS_SUCCESS );
1195 }
1196
1197 checksum.writeHeader( out );
1198
1199 if ( targets == null ) {
1200 out.writeLong( offset );
1201 }
1202
1203 while ( endOffset >= offset ) {
1204 // Write one data chunk per loop.
1205 int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
1206 if ( len > 0 ) {
1207 blockIn.readFully( buf, 0, len );
1208 totalRead += len;
1209
1210 if ( checksumSize > 0 && checksumIn != null ) {
1211 try {
1212 checksumIn.readFully( buf, len, checksumSize );
1213 totalRead += checksumSize;
1214 } catch ( IOException e ) {
1215 LOG.warn( " Could not read checksum for data at offset " +
1216 offset + " for block " + block + " got : " +
1217 StringUtils.stringifyException(e) );
1218 FileUtil.closeStream( checksumIn );
1219 checksumIn = null;
1220 if ( corruptChecksumOk ) {
1221 // Just fill the array with zeros.
1222 Arrays.fill( buf, len, len + checksumSize, (byte)0 );
1223 } else {
1224 throw e;
1225 }
1226 }
1227 }
1228 }
1229
1230 out.writeInt( len );
1231 out.write( buf, 0, len + checksumSize );
1232
1233 if ( offset == endOffset ) {
1234 out.flush();
1235 // We are not waiting for response from target.
1236 break;
1237 }
1238 offset += len;
1239 }
1240 } finally {
1241 FileUtil.closeStream( blockInFile );
1242 FileUtil.closeStream( checksumIn );
1243 FileUtil.closeStream( blockIn );
1244 FileUtil.closeStream( out );
1245 }
1246
1247 return totalRead;
1248 }
1249
1250 /**
1251 * Used for transferring a block of data. This class
1252 * sends a piece of data to another DataNode.
1253 */
1254 class DataTransfer implements Runnable {
1255 DatanodeInfo targets[];
1256 Block b;
1257
1258 /**
1259 * Connect to the first item in the target list. Pass along the
1260 * entire target list, the block, and the data.
1261 */
1262 public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
1263 this.targets = targets;
1264 this.b = b;
1265 }
1266
1267 /**
1268 * Do the deed, write the bytes
1269 */
1270 public void run() {
1271 xmitsInProgress++;
1272 Socket sock = null;
1273
1274 try {
1275 InetSocketAddress curTarget =
1276 createSocketAddr(targets[0].getName());
1277 sock = new Socket();
1278 sock.connect(curTarget, READ_TIMEOUT);
1279 sock.setSoTimeout(READ_TIMEOUT);
1280 sendBlock( sock, b, 0, -1, targets );
1281 LOG.info( "Transmitted block " + b + " to " + curTarget );
1282
1283 } catch ( IOException ie ) {
1284 LOG.warn( "Failed to transfer " + b + " to " +
1285 targets[0].getName() + " got " +
1286 StringUtils.stringifyException( ie ) );
1287 } finally {
1288 FileUtil.closeSocket(sock);
1289 xmitsInProgress--;
1290 }
1291 }
1292 }
1293
1294 /**
1295 * No matter what kind of exception we get, keep retrying to offerService().
1296 * That's the loop that connects to the NameNode and provides basic DataNode
1297 * functionality.
1298 *
1299 * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
1300 */
1301 public void run() {
1302 LOG.info("In DataNode.run, data = " + data);
1303
1304 // start dataXceiveServer
1305 dataXceiveServer.start();
1306
1307 while (shouldRun) {
1308 try {
1309 startDistributedUpgradeIfNeeded();
1310 offerService();
1311 } catch (Exception ex) {
1312 LOG.error("Exception: " + StringUtils.stringifyException(ex));
1313 if (shouldRun) {
1314 try {
1315 Thread.sleep(5000);
1316 } catch (InterruptedException ie) {
1317 }
1318 }
1319 }
1320 }
1321
1322 // wait for dataXceiveServer to terminate
1323 try {
1324 this.dataXceiveServer.join();
1325 } catch (InterruptedException ie) {
1326 }
1327
1328 LOG.info("Finishing DataNode in: "+data);
1329 }
1330
1331 /** Start datanode daemon.
1332 */
1333 public static DataNode run(Configuration conf) throws IOException {
1334 String[] dataDirs = conf.getStrings("dfs.data.dir");
1335 DataNode dn = makeInstance(dataDirs, conf);
1336 if (dn != null) {
1337 dataNodeThread = new Thread(dn, "DataNode: [" +
1338 StringUtils.arrayToString(dataDirs) + "]");
1339 dataNodeThread.setDaemon(true); // needed for JUnit testing
1340 dataNodeThread.start();
1341 }
1342 return dn;
1343 }
1344
1345 /** Start a single datanode daemon and wait for it to finish.
1346 * If this thread is specifically interrupted, it will stop waiting.
1347 */
1348 static DataNode createDataNode(String args[],
1349 Configuration conf) throws IOException {
1350 if (conf == null)
1351 conf = new Configuration();
1352 if (!parseArguments(args, conf)) {
1353 printUsage();
1354 return null;
1355 }
1356 return run(conf);
1357 }
1358
1359 void join() {
1360 if (dataNodeThread != null) {
1361 try {
1362 dataNodeThread.join();
1363 } catch (InterruptedException e) {}
1364 }
1365 }
1366
1367 /**
1368 * Make an instance of DataNode after ensuring that at least one of the
1369 * given data directories (and their parent directories, if necessary)
1370 * can be created.
1371 * @param dataDirs List of directories, where the new DataNode instance should
1372 * keep its files.
1373 * @param conf Configuration instance to use.
1374 * @return DataNode instance for given list of data dirs and conf, or null if
1375 * no directory from this directory list can be created.
1376 * @throws IOException
1377 */
1378 static DataNode makeInstance(String[] dataDirs, Configuration conf)
1379 throws IOException {
1380 ArrayList<File> dirs = new ArrayList<File>();
1381 for (int i = 0; i < dataDirs.length; i++) {
1382 File data = new File(dataDirs[i]);
1383 try {
1384 DiskChecker.checkDir(data);
1385 dirs.add(data);
1386 } catch(DiskErrorException e) {
1387 LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
1388 }
1389 }
1390 if (dirs.size() > 0)
1391 return new DataNode(conf, dirs);
1392 LOG.error("All directories in dfs.data.dir are invalid.");
1393 return null;
1394 }
1395
1396 public String toString() {
1397 return "DataNode{" +
1398 "data=" + data +
1399 ", localName='" + dnRegistration.getName() + "'" +
1400 ", storageID='" + dnRegistration.getStorageID() + "'" +
1401 ", xmitsInProgress=" + xmitsInProgress +
1402 "}";
1403 }
1404
1405 private static void printUsage() {
1406 System.err.println("Usage: java DataNode");
1407 System.err.println(" [-r, --rack <network location>] |");
1408 System.err.println(" [-rollback]");
1409 }
1410
1411 /**
1412 * Parse and verify command line arguments and set configuration parameters.
1413 *
1414 * @return false if passed argements are incorrect
1415 */
1416 private static boolean parseArguments(String args[],
1417 Configuration conf) {
1418 int argsLen = (args == null) ? 0 : args.length;
1419 StartupOption startOpt = StartupOption.REGULAR;
1420 String networkLoc = null;
1421 for(int i=0; i < argsLen; i++) {
1422 String cmd = args[i];
1423 if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
1424 if (i==args.length-1)
1425 return false;
1426 networkLoc = args[++i];
1427 if (networkLoc.startsWith("-"))
1428 return false;
1429 } else if ("-rollback".equalsIgnoreCase(cmd)) {
1430 startOpt = StartupOption.ROLLBACK;
1431 } else if ("-regular".equalsIgnoreCase(cmd)) {
1432 startOpt = StartupOption.REGULAR;
1433 } else
1434 return false;
1435 }
1436 if (networkLoc != null)
1437 conf.set("dfs.datanode.rack", NodeBase.normalize(networkLoc));
1438 setStartupOption(conf, startOpt);
1439 return true;
1440 }
1441
1442 private static void setStartupOption(Configuration conf, StartupOption opt) {
1443 conf.set("dfs.datanode.startup", opt.toString());
1444 }
1445
1446 static StartupOption getStartupOption(Configuration conf) {
1447 return StartupOption.valueOf(conf.get("dfs.datanode.startup",
1448 StartupOption.REGULAR.toString()));
1449 }
1450
1451 /* Get the network location by running a script configured in conf */
1452 private static String getNetworkLoc(Configuration conf)
1453 throws IOException {
1454 String locScript = conf.get("dfs.network.script");
1455 if (locScript == null)
1456 return NetworkTopology.DEFAULT_RACK;
1457
1458 LOG.info("Starting to run script to get datanode network location");
1459 Process p = Runtime.getRuntime().exec(locScript);
1460 StringBuffer networkLoc = new StringBuffer();
1461 final BufferedReader inR = new BufferedReader(
1462 new InputStreamReader(p.getInputStream()));
1463 final BufferedReader errR = new BufferedReader(
1464 new InputStreamReader(p.getErrorStream()));
1465
1466 // read & log any error messages from the running script
1467 Thread errThread = new Thread() {
1468 public void start() {
1469 try {
1470 String errLine = errR.readLine();
1471 while(errLine != null) {
1472 LOG.warn("Network script error: "+errLine);
1473 errLine = errR.readLine();
1474 }
1475 } catch(IOException e) {
1476
1477 }
1478 }
1479 };
1480 try {
1481 errThread.start();
1482
1483 // fetch output from the process
1484 String line = inR.readLine();
1485 while(line != null) {
1486 networkLoc.append(line);
1487 line = inR.readLine();
1488 }
1489 try {
1490 // wait for the process to finish
1491 int returnVal = p.waitFor();
1492 // check the exit code
1493 if (returnVal != 0) {
1494 throw new IOException("Process exits with nonzero status: "+locScript);
1495 }
1496 } catch (InterruptedException e) {
1497 throw new IOException(e.getMessage());
1498 } finally {
1499 try {
1500 // make sure that the error thread exits
1501 errThread.join();
1502 } catch (InterruptedException je) {
1503 LOG.warn(StringUtils.stringifyException(je));
1504 }
1505 }
1506 } finally {
1507 // close in & error streams
1508 try {
1509 inR.close();
1510 } catch (IOException ine) {
1511 throw ine;
1512 } finally {
1513 errR.close();
1514 }
1515 }
1516
1517 return networkLoc.toString();
1518 }
1519
1520 /**
1521 */
1522 public static void main(String args[]) {
1523 try {
1524 StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
1525 DataNode datanode = createDataNode(args, null);
1526 if (datanode != null)
1527 datanode.join();
1528 } catch (Throwable e) {
1529 LOG.error(StringUtils.stringifyException(e));
1530 System.exit(-1);
1531 }
1532 }
1533
1534 }