Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » dfs » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   package org.apache.hadoop.dfs;
   19   
   20   import org.apache.commons.logging;
   21   
   22   import org.apache.hadoop.fs.Path;
   23   import org.apache.hadoop.ipc;
   24   import org.apache.hadoop.conf;
   25   import org.apache.hadoop.metrics.MetricsUtil;
   26   import org.apache.hadoop.net.DNS;
   27   import org.apache.hadoop.net.NodeBase;
   28   import org.apache.hadoop.util;
   29   import org.apache.hadoop.util.DiskChecker.DiskErrorException;
   30   import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
   31   import org.apache.hadoop.mapred.StatusHttpServer;
   32   import org.apache.hadoop.net.NetworkTopology;
   33   import org.apache.hadoop.dfs.BlockCommand;
   34   import org.apache.hadoop.dfs.DatanodeProtocol;
   35   import org.apache.hadoop.fs.FileUtil;
   36   
   37   import java.io;
   38   import java.net;
   39   import java.util;
   40   import org.apache.hadoop.metrics.MetricsContext;
   41   import org.apache.hadoop.metrics.MetricsRecord;
   42   import org.apache.hadoop.metrics.Updater;
   43   import org.apache.hadoop.metrics.jvm.JvmMetrics;
   44   
   45   /**********************************************************
   46    * DataNode is a class (and program) that stores a set of
   47    * blocks for a DFS deployment.  A single deployment can
   48    * have one or many DataNodes.  Each DataNode communicates
   49    * regularly with a single NameNode.  It also communicates
   50    * with client code and other DataNodes from time to time.
   51    *
   52    * DataNodes store a series of named blocks.  The DataNode
   53    * allows client code to read these blocks, or to write new
   54    * block data.  The DataNode may also, in response to instructions
   55    * from its NameNode, delete blocks or copy blocks to/from other
   56    * DataNodes.
   57    *
   58    * The DataNode maintains just one critical table:
   59    *   block-> stream of bytes (of BLOCK_SIZE or less)
   60    *
   61    * This info is stored on a local disk.  The DataNode
   62    * reports the table's contents to the NameNode upon startup
   63    * and every so often afterwards.
   64    *
   65    * DataNodes spend their lives in an endless loop of asking
   66    * the NameNode for something to do.  A NameNode cannot connect
   67    * to a DataNode directly; a NameNode simply returns values from
   68    * functions invoked by a DataNode.
   69    *
   70    * DataNodes maintain an open server socket so that client code 
   71    * or other DataNodes can read/write data.  The host/port for
   72    * this server is reported to the NameNode, which then sends that
   73    * information to clients or other DataNodes that might be interested.
   74    *
   75    **********************************************************/
   76   public class DataNode implements FSConstants, Runnable {
   77     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
   78   
   79     /**
   80      * Util method to build socket addr from either:
   81      *   <host>:<post>
   82      *   <fs>://<host>:<port>/<path>
   83      */
   84     public static InetSocketAddress createSocketAddr(String target
   85                                                      ) throws IOException {
   86       int colonIndex = target.indexOf(':');
   87       if (colonIndex < 0) {
   88         throw new RuntimeException("Not a host:port pair: " + target);
   89       }
   90       String hostname;
   91       int port;
   92       if (!target.contains("/")) {
   93         // must be the old style <host>:<port>
   94         hostname = target.substring(0, colonIndex);
   95         port = Integer.parseInt(target.substring(colonIndex + 1));
   96       } else {
   97         // a new uri
   98         URI addr = new Path(target).toUri();
   99         hostname = addr.getHost();
  100         port = addr.getPort();
  101       }
  102   
  103       return new InetSocketAddress(hostname, port);
  104     }
  105   
  106     DatanodeProtocol namenode = null;
  107     FSDataset data = null;
  108     DatanodeRegistration dnRegistration = null;
  109     private String networkLoc;
  110     volatile boolean shouldRun = true;
  111     LinkedList<Block> receivedBlockList = new LinkedList<Block>();
  112     int xmitsInProgress = 0;
  113     Daemon dataXceiveServer = null;
  114     long blockReportInterval;
  115     long lastBlockReport = 0;
  116     long lastHeartbeat = 0;
  117     long heartBeatInterval;
  118     private DataStorage storage = null;
  119     private StatusHttpServer infoServer = null;
  120     private DataNodeMetrics myMetrics;
  121     private static InetSocketAddress nameNodeAddr;
  122     private static DataNode datanodeObject = null;
  123     private static Thread dataNodeThread = null;
  124     String machineName;
  125     int defaultBytesPerChecksum = 512;
  126   
  127     private static class DataNodeMetrics implements Updater {
  128       private final MetricsRecord metricsRecord;
  129       private int bytesWritten = 0;
  130       private int bytesRead = 0;
  131       private int blocksWritten = 0;
  132       private int blocksRead = 0;
  133       private int blocksReplicated = 0;
  134       private int blocksRemoved = 0;
  135         
  136       DataNodeMetrics(Configuration conf) {
  137         String sessionId = conf.get("session.id"); 
  138         // Initiate reporting of Java VM metrics
  139         JvmMetrics.init("DataNode", sessionId);
  140         // Create record for DataNode metrics
  141         MetricsContext context = MetricsUtil.getContext("dfs");
  142         metricsRecord = MetricsUtil.createRecord(context, "datanode");
  143         metricsRecord.setTag("sessionId", sessionId);
  144         context.registerUpdater(this);
  145       }
  146         
  147       /**
  148        * Since this object is a registered updater, this method will be called
  149        * periodically, e.g. every 5 seconds.
  150        */
  151       public void doUpdates(MetricsContext unused) {
  152         synchronized (this) {
  153           metricsRecord.incrMetric("bytes_read", bytesRead);
  154           metricsRecord.incrMetric("bytes_written", bytesWritten);
  155           metricsRecord.incrMetric("blocks_read", blocksRead);
  156           metricsRecord.incrMetric("blocks_written", blocksWritten);
  157           metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
  158           metricsRecord.incrMetric("blocks_removed", blocksRemoved);
  159                 
  160           bytesWritten = 0;
  161           bytesRead = 0;
  162           blocksWritten = 0;
  163           blocksRead = 0;
  164           blocksReplicated = 0;
  165           blocksRemoved = 0;
  166         }
  167         metricsRecord.update();
  168       }
  169   
  170       synchronized void readBytes(int nbytes) {
  171         bytesRead += nbytes;
  172       }
  173         
  174       synchronized void wroteBytes(int nbytes) {
  175         bytesWritten += nbytes;
  176       }
  177         
  178       synchronized void readBlocks(int nblocks) {
  179         blocksRead += nblocks;
  180       }
  181         
  182       synchronized void wroteBlocks(int nblocks) {
  183         blocksWritten += nblocks;
  184       }
  185         
  186       synchronized void replicatedBlocks(int nblocks) {
  187         blocksReplicated += nblocks;
  188       }
  189         
  190       synchronized void removedBlocks(int nblocks) {
  191         blocksRemoved += nblocks;
  192       }
  193     }
  194       
  195     /**
  196      * Create the DataNode given a configuration and an array of dataDirs.
  197      * 'dataDirs' is where the blocks are stored.
  198      */
  199     DataNode(Configuration conf, 
  200              AbstractList<File> dataDirs) throws IOException {
  201         
  202       myMetrics = new DataNodeMetrics(conf);
  203       datanodeObject = this;
  204   
  205       try {
  206         startDataNode(conf, dataDirs);
  207       } catch (IOException ie) {
  208         shutdown();
  209         throw ie;
  210       }
  211     }
  212       
  213     void startDataNode(Configuration conf, 
  214                        AbstractList<File> dataDirs
  215                        ) throws IOException {
  216       // use configured nameserver & interface to get local hostname
  217       machineName = DNS.getDefaultHost(
  218                                        conf.get("dfs.datanode.dns.interface","default"),
  219                                        conf.get("dfs.datanode.dns.nameserver","default"));
  220       InetSocketAddress nameNodeAddr = createSocketAddr(
  221                                                         conf.get("fs.default.name", "local"));
  222       
  223       this.defaultBytesPerChecksum = 
  224          Math.max(conf.getInt("io.bytes.per.checksum", 512), 1); 
  225       
  226       int tmpPort = conf.getInt("dfs.datanode.port", 50010);
  227       storage = new DataStorage();
  228       // construct registration
  229       this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
  230   
  231       // connect to name node
  232       this.namenode = (DatanodeProtocol) 
  233         RPC.waitForProxy(DatanodeProtocol.class,
  234                          DatanodeProtocol.versionID,
  235                          nameNodeAddr, 
  236                          conf);
  237       // get version and id info from the name-node
  238       NamespaceInfo nsInfo = handshake();
  239   
  240       // read storage info, lock data dirs and transition fs state if necessary
  241       StartupOption startOpt = getStartupOption(conf);
  242       assert startOpt != null : "Startup option must be set.";
  243       storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
  244       // adjust
  245       this.dnRegistration.setStorageInfo(storage);
  246         
  247       // initialize data node internal structure
  248       this.data = new FSDataset(storage, conf);
  249         
  250       // find free port
  251       ServerSocket ss = null;
  252       String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
  253       while (ss == null) {
  254         try {
  255           ss = new ServerSocket(tmpPort, 0, InetAddress.getByName(bindAddress));
  256           LOG.info("Opened server at " + tmpPort);
  257         } catch (IOException ie) {
  258           LOG.info("Could not open server at " + tmpPort + ", trying new port");
  259           tmpPort++;
  260         }
  261       }
  262       // adjust machine name with the actual port
  263       this.dnRegistration.setName(machineName + ":" + tmpPort);
  264         
  265       this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
  266   
  267       long blockReportIntervalBasis =
  268         conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
  269       this.blockReportInterval =
  270         blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
  271       this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
  272       DataNode.nameNodeAddr = nameNodeAddr;
  273   
  274       //create a servlet to serve full-file content
  275       int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
  276       String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
  277       this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
  278       this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
  279       this.infoServer.start();
  280       // adjust info port
  281       this.dnRegistration.setInfoPort(this.infoServer.getPort());
  282       // get network location
  283       this.networkLoc = conf.get("dfs.datanode.rack");
  284       if (networkLoc == null)  // exec network script or set the default rack
  285         networkLoc = getNetworkLoc(conf);
  286       // register datanode
  287       register();
  288     }
  289   
  290     private NamespaceInfo handshake() throws IOException {
  291       NamespaceInfo nsInfo = new NamespaceInfo();
  292       while (shouldRun) {
  293         try {
  294           nsInfo = namenode.versionRequest();
  295           break;
  296         } catch(SocketTimeoutException e) {  // namenode is busy
  297           LOG.info("Problem connecting to server: " + getNameNodeAddr());
  298           try {
  299             Thread.sleep(1000);
  300           } catch (InterruptedException ie) {}
  301         }
  302       }
  303       String errorMsg = null;
  304       // verify build version
  305       if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
  306         errorMsg = "Incompatible build versions: namenode BV = " 
  307           + nsInfo.getBuildVersion() + "; datanode BV = "
  308           + Storage.getBuildVersion();
  309         LOG.fatal( errorMsg );
  310         try {
  311           namenode.errorReport( dnRegistration,
  312                                 DatanodeProtocol.NOTIFY, errorMsg );
  313         } catch( SocketTimeoutException e ) {  // namenode is busy
  314           LOG.info("Problem connecting to server: " + getNameNodeAddr());
  315         }
  316         throw new IOException( errorMsg );
  317       }
  318       assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
  319         "Data-node and name-node layout versions must be the same.";
  320       return nsInfo;
  321     }
  322   
  323     /** Return the DataNode object
  324      * 
  325      */
  326     public static DataNode getDataNode() {
  327       return datanodeObject;
  328     } 
  329   
  330     public InetSocketAddress getNameNodeAddr() {
  331       return nameNodeAddr;
  332     }
  333       
  334     /**
  335      * Return the namenode's identifier
  336      */
  337     public String getNamenode() {
  338       //return namenode.toString();
  339       return "<namenode>";
  340     }
  341   
  342     /**
  343      * Register datanode
  344      * <p>
  345      * The datanode needs to register with the namenode on startup in order
  346      * 1) to report which storage it is serving now and 
  347      * 2) to receive a registrationID 
  348      * issued by the namenode to recognize registered datanodes.
  349      * 
  350      * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
  351      * @throws IOException
  352      */
  353     private void register() throws IOException {
  354       while(shouldRun) {
  355         try {
  356           // reset name to machineName. Mainly for web interface.
  357           dnRegistration.name = machineName + ":" + dnRegistration.getPort();
  358           dnRegistration = namenode.register(dnRegistration, networkLoc);
  359           break;
  360         } catch(SocketTimeoutException e) {  // namenode is busy
  361           LOG.info("Problem connecting to server: " + getNameNodeAddr());
  362           try {
  363             Thread.sleep(1000);
  364           } catch (InterruptedException ie) {}
  365         }
  366       }
  367       assert ("".equals(storage.getStorageID()) 
  368               && !"".equals(dnRegistration.getStorageID()))
  369               || storage.getStorageID().equals(dnRegistration.getStorageID()) :
  370               "New storageID can be assigned only if data-node is not formatted";
  371       if (storage.getStorageID().equals("")) {
  372         storage.setStorageID(dnRegistration.getStorageID());
  373         storage.writeAll();
  374         LOG.info("New storage id " + dnRegistration.getStorageID()
  375             + " is assigned to data-node " + dnRegistration.getName());
  376       }
  377       if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
  378         throw new IOException("Inconsistent storage IDs. Name-node returned "
  379             + dnRegistration.getStorageID() 
  380             + ". Expecting " + storage.getStorageID());
  381       }
  382     }
  383   
  384     /**
  385      * Shut down this instance of the datanode.
  386      * Returns only after shutdown is complete.
  387      */
  388     public void shutdown() {
  389       if (infoServer != null) {
  390         try {
  391           infoServer.stop();
  392         } catch (Exception e) {
  393         }
  394       }
  395       this.shouldRun = false;
  396       if (dataXceiveServer != null) {
  397         ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
  398         this.dataXceiveServer.interrupt();
  399       }
  400       if(upgradeManager != null)
  401         upgradeManager.shutdownUpgrade();
  402       if (storage != null) {
  403         try {
  404           this.storage.unlockAll();
  405         } catch (IOException ie) {
  406         }
  407       }
  408       if (dataNodeThread != null) {
  409         dataNodeThread.interrupt();
  410         try {
  411           dataNodeThread.join();
  412         } catch (InterruptedException ie) {
  413         }
  414       }
  415     }
  416     
  417     
  418     /* Check if there is no space in disk or the disk is read-only
  419      *  when IOException occurs. 
  420      * If so, handle the error */
  421     private void checkDiskError( IOException e ) throws IOException {
  422       if (e.getMessage().startsWith("No space left on device")) {
  423         throw new DiskOutOfSpaceException("No space left on device");
  424       } else {
  425         checkDiskError();
  426       }
  427     }
  428     
  429     /* Check if there is no disk space and if so, handle the error*/
  430     private void checkDiskError( ) throws IOException {
  431       try {
  432         data.checkDataDir();
  433       } catch(DiskErrorException de) {
  434         handleDiskError(de.getMessage());
  435       }
  436     }
  437     
  438     private void handleDiskError(String errMsgr) {
  439       LOG.warn("DataNode is shutting down.\n" + errMsgr);
  440       try {
  441         namenode.errorReport(
  442                              dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
  443       } catch(IOException ignored) {              
  444       }
  445       shutdown();
  446     }
  447       
  448     private static class Count {
  449       int value = 0;
  450       Count(int init) { value = init; }
  451       synchronized void incr() { value++; }
  452       synchronized void decr() { value--; }
  453       public String toString() { return Integer.toString(value); }
  454       public int getValue() { return value; }
  455     }
  456       
  457     Count xceiverCount = new Count(0);
  458       
  459     /**
  460      * Main loop for the DataNode.  Runs until shutdown,
  461      * forever calling remote NameNode functions.
  462      */
  463     public void offerService() throws Exception {
  464        
  465       LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
  466   
  467       //
  468       // Now loop for a long time....
  469       //
  470   
  471       while (shouldRun) {
  472         try {
  473           long now = System.currentTimeMillis();
  474   
  475           //
  476           // Every so often, send heartbeat or block-report
  477           //
  478           if (now - lastHeartbeat > heartBeatInterval) {
  479             //
  480             // All heartbeat messages include following info:
  481             // -- Datanode name
  482             // -- data transfer port
  483             // -- Total capacity
  484             // -- Bytes remaining
  485             //
  486             DatanodeCommand cmd = namenode.sendHeartbeat(dnRegistration, 
  487                                                          data.getCapacity(), 
  488                                                          data.getRemaining(), 
  489                                                          xmitsInProgress,
  490                                                          xceiverCount.getValue());
  491             //LOG.info("Just sent heartbeat, with name " + localName);
  492             lastHeartbeat = now;
  493             if (!processCommand(cmd))
  494               continue;
  495           }
  496               
  497           // check if there are newly received blocks
  498           Block [] blockArray=null;
  499           synchronized(receivedBlockList) {
  500             if (receivedBlockList.size() > 0) {
  501               //
  502               // Send newly-received blockids to namenode
  503               //
  504               blockArray = receivedBlockList.toArray(new Block[receivedBlockList.size()]);
  505             }
  506           }
  507           if (blockArray != null) {
  508             namenode.blockReceived(dnRegistration, blockArray);
  509             synchronized (receivedBlockList) {
  510               for(Block b: blockArray) {
  511                 receivedBlockList.remove(b);
  512               }
  513             }
  514           }
  515   
  516           // send block report
  517           if (now - lastBlockReport > blockReportInterval) {
  518             //
  519             // Send latest blockinfo report if timer has expired.
  520             // Get back a list of local block(s) that are obsolete
  521             // and can be safely GC'ed.
  522             //
  523             DatanodeCommand cmd = namenode.blockReport(dnRegistration,
  524                                                        data.getBlockReport());
  525             //
  526             // If we have sent the first block report, then wait a random
  527             // time before we start the periodic block reports.
  528             //
  529             if (lastBlockReport == 0) {
  530               lastBlockReport = now - new Random().nextInt((int)(blockReportInterval));
  531             } else {
  532               lastBlockReport = now;
  533             }
  534             processCommand(cmd);
  535           }
  536               
  537           //
  538           // There is no work to do;  sleep until hearbeat timer elapses, 
  539           // or work arrives, and then iterate again.
  540           //
  541           long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
  542           synchronized(receivedBlockList) {
  543             if (waitTime > 0 && receivedBlockList.size() == 0) {
  544               try {
  545                 receivedBlockList.wait(waitTime);
  546               } catch (InterruptedException ie) {
  547               }
  548             }
  549           } // synchronized
  550         } catch(RemoteException re) {
  551           String reClass = re.getClassName();
  552           if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
  553               DisallowedDatanodeException.class.getName().equals(reClass)) {
  554             LOG.warn("DataNode is shutting down: " + 
  555                      StringUtils.stringifyException(re));
  556             shutdown();
  557             return;
  558           }
  559           LOG.warn(StringUtils.stringifyException(re));
  560         } catch (IOException e) {
  561           LOG.warn(StringUtils.stringifyException(e));
  562         }
  563       } // while (shouldRun)
  564     } // offerService
  565   
  566       /**
  567        * 
  568        * @param cmd
  569        * @return true if further processing may be required or false otherwise. 
  570        * @throws IOException
  571        */
  572     private boolean processCommand(DatanodeCommand cmd) throws IOException {
  573       if (cmd == null)
  574         return true;
  575       switch(cmd.getAction()) {
  576       case DatanodeProtocol.DNA_TRANSFER:
  577         //
  578         // Send a copy of a block to another datanode
  579         //
  580         BlockCommand bcmd = (BlockCommand)cmd;
  581         transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
  582         break;
  583       case DatanodeProtocol.DNA_INVALIDATE:
  584         //
  585         // Some local block(s) are obsolete and can be 
  586         // safely garbage-collected.
  587         //
  588         Block toDelete[] = ((BlockCommand)cmd).getBlocks();
  589         try {
  590           data.invalidate(toDelete);
  591         } catch(IOException e) {
  592           checkDiskError();
  593           throw e;
  594         }
  595         myMetrics.removedBlocks(toDelete.length);
  596         break;
  597       case DatanodeProtocol.DNA_SHUTDOWN:
  598         // shut down the data node
  599         this.shutdown();
  600         return false;
  601       case DatanodeProtocol.DNA_REGISTER:
  602         // namenode requested a registration
  603         register();
  604         lastHeartbeat=0;
  605         lastBlockReport=0;
  606         break;
  607       case DatanodeProtocol.DNA_FINALIZE:
  608         storage.finalizeUpgrade();
  609         break;
  610       case UpgradeCommand.UC_ACTION_START_UPGRADE:
  611         // start distributed upgrade here
  612         processDistributedUpgradeCommand((UpgradeCommand)cmd);
  613         break;
  614       default:
  615         LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
  616       }
  617       return true;
  618     }
  619   
  620     // Distributed upgrade manager
  621     UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
  622   
  623     private void processDistributedUpgradeCommand(UpgradeCommand comm
  624                                                  ) throws IOException {
  625       assert upgradeManager != null : "DataNode.upgradeManager is null.";
  626       upgradeManager.processUpgradeCommand(comm);
  627     }
  628   
  629   
  630     /**
  631      * Start distributed upgrade if it should be initiated by the data-node.
  632      */
  633     private void startDistributedUpgradeIfNeeded() throws IOException {
  634       UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
  635       assert um != null : "DataNode.upgradeManager is null.";
  636       if(!um.getUpgradeState())
  637         return;
  638       um.setUpgradeState(false, um.getUpgradeVersion());
  639       um.startUpgrade();
  640       return;
  641     }
  642     private void transferBlocks( Block blocks[], 
  643                                  DatanodeInfo xferTargets[][] 
  644                                  ) throws IOException {
  645       for (int i = 0; i < blocks.length; i++) {
  646         if (!data.isValidBlock(blocks[i])) {
  647           String errStr = "Can't send invalid block " + blocks[i];
  648           LOG.info(errStr);
  649           namenode.errorReport(dnRegistration, 
  650                                DatanodeProtocol.INVALID_BLOCK, 
  651                                errStr);
  652           break;
  653         }
  654         if (xferTargets[i].length > 0) {
  655           LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
  656           new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
  657         }
  658       }
  659     }
  660       
  661     /**
  662      * Server used for receiving/sending a block of data.
  663      * This is created to listen for requests from clients or 
  664      * other DataNodes.  This small server does not use the 
  665      * Hadoop IPC mechanism.
  666      */
  667     class DataXceiveServer implements Runnable {
  668       ServerSocket ss;
  669       public DataXceiveServer(ServerSocket ss) {
  670         this.ss = ss;
  671       }
  672   
  673       /**
  674        */
  675       public void run() {
  676         try {
  677           while (shouldRun) {
  678             Socket s = ss.accept();
  679             //s.setSoTimeout(READ_TIMEOUT);
  680             xceiverCount.incr();
  681             new Daemon(new DataXceiver(s)).start();
  682           }
  683           ss.close();
  684         } catch (IOException ie) {
  685           LOG.info("Exiting DataXceiveServer due to " + ie.toString());
  686         }
  687       }
  688       public void kill() {
  689         assert shouldRun == false :
  690           "shoudRun should be set to false before killing";
  691         try {
  692           this.ss.close();
  693         } catch (IOException iex) {
  694         }
  695       }
  696     }
  697   
  698     /**
  699      * Thread for processing incoming/outgoing data stream
  700      */
  701     class DataXceiver implements Runnable {
  702       Socket s;
  703       public DataXceiver(Socket s) {
  704         this.s = s;
  705         LOG.debug("Number of active connections is: "+xceiverCount);
  706       }
  707   
  708       /**
  709        * Read/write data from/to the DataXceiveServer.
  710        */
  711       public void run() {
  712         try {
  713           DataInputStream in = new DataInputStream(
  714              new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
  715           short version = in.readShort();
  716           if ( version != DATA_TRANFER_VERSION ) {
  717             throw new IOException( "Version Mismatch" );
  718           }
  719   
  720           byte op = in.readByte();
  721   
  722           switch ( op ) {
  723           case OP_READ_BLOCK:
  724             readBlock( in );
  725             break;
  726           case OP_WRITE_BLOCK:
  727             writeBlock( in );
  728             break;
  729           case OP_READ_METADATA:
  730             readMetadata( in );
  731             break;
  732           default:
  733             throw new IOException("Unknown opcode " + op + "in data stream");
  734           }
  735          } catch (Throwable t) {
  736           LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
  737         } finally {
  738           try {
  739             xceiverCount.decr();
  740             LOG.debug("Number of active connections is: "+xceiverCount);
  741             s.close();
  742           } catch (IOException ie2) {
  743           }
  744         }
  745       }
  746   
  747       /**
  748        * Read a block from the disk
  749        * @param in The stream to read from
  750        * @throws IOException
  751        */
  752       private void readBlock(DataInputStream in) throws IOException {
  753         //
  754         // Read in the header
  755         //
  756         long blockId = in.readLong();          
  757         Block block = new Block( blockId, 0 );
  758   
  759         long startOffset = in.readLong();
  760         long length = in.readLong();
  761         
  762         try {
  763           //XXX Buffered output stream?
  764           long read = sendBlock(s, block, startOffset, length, null );
  765           myMetrics.readBytes((int)read);
  766           myMetrics.readBlocks(1);
  767           LOG.info("Served block " + block + " to " + s.getInetAddress());
  768         } catch ( SocketException ignored ) {
  769           // Its ok for remote side to close the connection anytime.
  770           myMetrics.readBlocks(1);
  771         } catch ( IOException ioe ) {
  772           /* What exactly should we do here?
  773            * Earlier version shutdown() datanode if there is disk error.
  774            */
  775           LOG.warn( "Got exception while serving " + block + " to " +
  776                     s.getInetAddress() + ": " + 
  777                     StringUtils.stringifyException(ioe) );
  778           throw ioe;
  779         }
  780       }
  781   
  782       /**
  783        * Write a block to disk.
  784        * @param in The stream to read from
  785        * @throws IOException
  786        */
  787       private void writeBlock(DataInputStream in) throws IOException {
  788         //
  789         // Read in the header
  790         //
  791         DataOutputStream reply = new DataOutputStream(s.getOutputStream());
  792         DataOutputStream out = null;
  793         DataOutputStream checksumOut = null;
  794         Socket mirrorSock = null;
  795         DataOutputStream mirrorOut = null;
  796         DataInputStream mirrorIn = null;
  797         
  798         try {
  799           /* We need an estimate for block size to check if the 
  800            * disk partition has enough space. For now we just increment
  801            * FSDataset.reserved by configured dfs.block.size
  802            * Other alternative is to include the block size in the header
  803            * sent by DFSClient.
  804            */
  805           Block block = new Block( in.readLong(), 0 );
  806           int numTargets = in.readInt();
  807           if ( numTargets < 0 ) {
  808             throw new IOException("Mislabelled incoming datastream.");
  809           }
  810           DatanodeInfo targets[] = new DatanodeInfo[numTargets];
  811           for (int i = 0; i < targets.length; i++) {
  812             DatanodeInfo tmp = new DatanodeInfo();
  813             tmp.readFields(in);
  814             targets[i] = tmp;
  815           }
  816               
  817           DataChecksum checksum = DataChecksum.newDataChecksum( in );
  818   
  819           //
  820           // Open local disk out
  821           //
  822           FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
  823           out = new DataOutputStream(
  824                     new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));        
  825           checksumOut = new DataOutputStream(
  826                     new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
  827           
  828           InetSocketAddress mirrorTarget = null;
  829           String mirrorNode = null;
  830           //
  831           // Open network conn to backup machine, if 
  832           // appropriate
  833           //
  834           if (targets.length > 0) {
  835             // Connect to backup machine
  836             mirrorNode = targets[0].getName();
  837             mirrorTarget = createSocketAddr(mirrorNode);
  838             try {
  839               mirrorSock = new Socket();
  840               mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
  841               mirrorSock.setSoTimeout(READ_TIMEOUT);
  842               mirrorOut = new DataOutputStream( 
  843                           new BufferedOutputStream(mirrorSock.getOutputStream(),
  844                                                    BUFFER_SIZE));
  845               mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
  846               //Copied from DFSClient.java!
  847               mirrorOut.writeShort( DATA_TRANFER_VERSION );
  848               mirrorOut.write( OP_WRITE_BLOCK );
  849               mirrorOut.writeLong( block.getBlockId() );
  850               mirrorOut.writeInt( targets.length - 1 );
  851               for ( int i = 1; i < targets.length; i++ ) {
  852                 targets[i].write( mirrorOut );
  853               }
  854               checksum.writeHeader( mirrorOut );
  855               myMetrics.replicatedBlocks(1);
  856             } catch (IOException ie) {
  857               if (mirrorOut != null) {
  858                 LOG.info("Exception connecting to mirror " + mirrorNode 
  859                          + "\n" + StringUtils.stringifyException(ie));
  860                 mirrorOut = null;
  861               }
  862             }
  863           }
  864           
  865           // XXX The following code is similar on both sides...
  866           
  867           int bytesPerChecksum = checksum.getBytesPerChecksum();
  868           int checksumSize = checksum.getChecksumSize();
  869           byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
  870           long blockLen = 0;
  871           long lastOffset = 0;
  872           long lastLen = 0;
  873           short status = -1;
  874           boolean headerWritten = false;
  875           
  876           while ( true ) {
  877             // Read one data chunk in each loop.
  878             
  879             long offset = lastOffset + lastLen;
  880             int len = (int) in.readInt();
  881             if ( len < 0 || len > bytesPerChecksum ) {
  882               LOG.warn( "Got wrong length during writeBlock(" +
  883                         block + ") from " + s.getRemoteSocketAddress() +
  884                         " at offset " + offset + ": " + len + 
  885                         " expected <= " + bytesPerChecksum );
  886               status = OP_STATUS_ERROR;
  887               break;
  888             }
  889   
  890             in.readFully( buf, 0, len + checksumSize );
  891             
  892             if ( len > 0 && checksumSize > 0 ) {
  893               /*
  894                * Verification is not included in the initial design.
  895                * For now, it at least catches some bugs. Later, we can 
  896                * include this after showing that it does not affect 
  897                * performance much.
  898                */
  899               checksum.update( buf, 0, len  );
  900               
  901               if ( ! checksum.compare( buf, len ) ) {
  902                 throw new IOException( "Unexpected checksum mismatch " +
  903                                        "while writing " + block + 
  904                                        " from " +
  905                                        s.getRemoteSocketAddress() );
  906               }
  907               
  908               checksum.reset();
  909             }
  910   
  911             // First write to remote node before writing locally.
  912             if (mirrorOut != null) {
  913               try {
  914                 mirrorOut.writeInt( len );
  915                 mirrorOut.write( buf, 0, len + checksumSize );
  916                 if (len == 0) {
  917                   mirrorOut.flush();
  918                 }
  919               } catch (IOException ioe) {
  920                 LOG.info( "Exception writing to mirror " + mirrorNode + 
  921                           "\n" + StringUtils.stringifyException(ioe) );
  922                 //
  923                 // If stream-copy fails, continue 
  924                 // writing to disk.  We shouldn't 
  925                 // interrupt client write.
  926                 //
  927                 mirrorOut = null;
  928               }
  929             }
  930   
  931             try {
  932               if ( !headerWritten ) { 
  933                 // First DATA_CHUNK. 
  934                 // Write the header even if checksumSize is 0.
  935                 checksumOut.writeShort( FSDataset.METADATA_VERSION );
  936                 checksum.writeHeader( checksumOut );
  937                 headerWritten = true;
  938               }
  939               
  940               if ( len > 0 ) {
  941                 out.write( buf, 0, len );
  942                 // Write checksum
  943                 checksumOut.write( buf, len, checksumSize );
  944                 myMetrics.wroteBytes( len );
  945               } else {
  946                 /* Should we sync() files here? It can add many millisecs of
  947                  * latency. We did not sync before HADOOP-1134 either.
  948                  */ 
  949                 out.close();
  950                 out = null;
  951                 checksumOut.close();
  952                 checksumOut = null;
  953               }
  954               
  955             } catch (IOException iex) {
  956               checkDiskError(iex);
  957               throw iex;
  958             }
  959             
  960             if ( len == 0 ) {
  961   
  962               // We already have one successful write here. Should we
  963               // wait for response from next target? We will skip for now.
  964   
  965               block.setNumBytes( blockLen );
  966               
  967               //Does this fsync()?
  968               data.finalizeBlock( block );
  969               myMetrics.wroteBlocks(1);
  970               
  971               status = OP_STATUS_SUCCESS;
  972               
  973               break;
  974             }
  975             
  976             if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
  977               LOG.warn( "Got wrong length during writeBlock(" +
  978                         block + ") from " + s.getRemoteSocketAddress() +
  979                         " : " + " got " + lastLen + " instead of " +
  980                         bytesPerChecksum );
  981               status = OP_STATUS_ERROR;
  982               break;
  983             }
  984             
  985             lastOffset = offset;
  986             lastLen = len;
  987             blockLen += len;
  988           }
  989           // done with reading the data.
  990           
  991           if ( status == OP_STATUS_SUCCESS ) {
  992             /* Informing the name node could take a long long time!
  993                Should we wait till namenode is informed before responding
  994                with success to the client? For now we don't.
  995             */
  996             synchronized ( receivedBlockList ) {
  997               receivedBlockList.add( block );
  998               receivedBlockList.notifyAll();
  999             }
 1000             
 1001             String msg = "Received block " + block + " from " + 
 1002                          s.getInetAddress();
 1003             
 1004             if ( mirrorOut != null ) {
 1005               //Wait for the remote reply
 1006               mirrorOut.flush();
 1007               short result = OP_STATUS_ERROR; 
 1008               try {
 1009                 result = mirrorIn.readShort();
 1010               } catch ( IOException ignored ) {}
 1011   
 1012               msg += " and " +  (( result != OP_STATUS_SUCCESS ) ? 
 1013                                   "failed to mirror to " : " mirrored to ") +
 1014                      mirrorTarget;
 1015               
 1016               mirrorOut = null;
 1017             }
 1018             
 1019             LOG.info(msg);
 1020           }
 1021               
 1022           if ( status >= 0 ) {
 1023             try {
 1024               reply.writeShort( status );
 1025               reply.flush();
 1026             } catch ( IOException ignored ) {}
 1027           }
 1028           
 1029         } finally {
 1030           try {
 1031             if ( out != null )
 1032               out.close();
 1033             if ( checksumOut != null )
 1034               checksumOut.close();
 1035             if ( mirrorSock != null )
 1036               mirrorSock.close();
 1037           } catch (IOException iex) {
 1038             shutdown();
 1039             throw iex;
 1040           }
 1041         }
 1042       }
 1043       
 1044       /**
 1045        * Reads the metadata and sends the data in one 'DATA_CHUNK'
 1046        * @param in
 1047        */
 1048       void readMetadata(DataInputStream in) throws IOException {
 1049         
 1050         Block block = new Block( in.readLong(), 0 );
 1051         InputStream checksumIn = null;
 1052         DataOutputStream out = null;
 1053         
 1054         try {
 1055           File blockFile = data.getBlockFile( block );
 1056           File checksumFile = FSDataset.getMetaFile( blockFile );
 1057           checksumIn = new FileInputStream(checksumFile);
 1058   
 1059           long fileSize = checksumFile.length();
 1060           if (fileSize >= 1L<<31 || fileSize <= 0) {
 1061             throw new IOException("Unexpected size for checksumFile " +
 1062                                   checksumFile);
 1063           }
 1064   
 1065           byte [] buf = new byte[(int)fileSize];
 1066           FileUtil.readFully(checksumIn, buf, 0, buf.length);
 1067           
 1068           out = new DataOutputStream(s.getOutputStream());
 1069           
 1070           out.writeByte(OP_STATUS_SUCCESS);
 1071           out.writeInt(buf.length);
 1072           out.write(buf);
 1073           
 1074           //last DATA_CHUNK
 1075           out.writeInt(0);
 1076         } finally {
 1077           FileUtil.closeStream(checksumIn);
 1078         }
 1079       }
 1080     }
 1081   
 1082     /** sendBlock() is used to read block and its metadata and stream
 1083      * the data to either a client or to another datanode.
 1084      * If argument targets is null, then it is assumed to be replying
 1085      * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
 1086      * to another datanode.
 1087      * 
 1088      * returns total bytes reads, including crc.
 1089      */
 1090     long sendBlock(Socket sock, Block block,
 1091                    long startOffset, long length, DatanodeInfo targets[] )
 1092                    throws IOException {
 1093       DataOutputStream out = new DataOutputStream(
 1094                              new BufferedOutputStream(sock.getOutputStream(), 
 1095                                                       BUFFER_SIZE));
 1096       RandomAccessFile blockInFile = null;
 1097       DataInputStream blockIn = null;
 1098       DataInputStream checksumIn = null;
 1099       long totalRead = 0;    
 1100   
 1101       /* XXX This will affect inter datanode transfers during 
 1102        * a CRC upgrade. There should not be any replication
 1103        * during crc upgrade since we are in safe mode, right?
 1104        */    
 1105       boolean corruptChecksumOk = targets == null; 
 1106   
 1107       try {
 1108         File blockFile = data.getBlockFile( block );
 1109         blockInFile = new RandomAccessFile(blockFile, "r");
 1110   
 1111         File checksumFile = FSDataset.getMetaFile( blockFile );
 1112         DataChecksum checksum = null;
 1113   
 1114         if ( !corruptChecksumOk || checksumFile.exists() ) {
 1115           checksumIn = new DataInputStream(
 1116                        new BufferedInputStream(new FileInputStream(checksumFile),
 1117                                                BUFFER_SIZE));
 1118             
 1119           //read and handle the common header here. For now just a version
 1120           short version = checksumIn.readShort();
 1121           if ( version != FSDataset.METADATA_VERSION ) {
 1122             LOG.warn( "Wrong version (" + version + 
 1123                       ") for metadata file for " + block + " ignoring ..." );
 1124           }
 1125           checksum = DataChecksum.newDataChecksum( checksumIn ) ;
 1126         } else {
 1127           LOG.warn( "Could not find metadata file for " + block );
 1128           // This only decides the buffer size. Use BUFFER_SIZE?
 1129           checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
 1130                                                    16*1024 );
 1131         }
 1132   
 1133         int bytesPerChecksum = checksum.getBytesPerChecksum();
 1134         int checksumSize = checksum.getChecksumSize();
 1135         
 1136         if (length < 0) {
 1137           length = data.getLength(block);
 1138         }
 1139   
 1140         long endOffset = data.getLength( block );
 1141         if ( startOffset < 0 || startOffset > endOffset ||
 1142             (length + startOffset) > endOffset ) {
 1143           String msg = " Offset " + startOffset + " and length " + length + 
 1144                        " don't match block " + block +  " ( blockLen " + 
 1145                        endOffset + " )"; 
 1146           LOG.warn( "sendBlock() : " + msg );
 1147           if ( targets != null ) {
 1148             throw new IOException(msg);
 1149           } else {
 1150             out.writeShort( OP_STATUS_ERROR_INVALID );
 1151             return totalRead;
 1152           }
 1153         }
 1154   
 1155         byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
 1156         long offset = (startOffset - (startOffset % bytesPerChecksum));
 1157         if ( length >= 0 ) {
 1158           // Make sure endOffset points to end of a checksumed chunk. 
 1159           long tmpLen = startOffset + length + (startOffset - offset);
 1160           if ( tmpLen % bytesPerChecksum != 0 ) { 
 1161             tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
 1162           }
 1163           if ( tmpLen < endOffset ) {
 1164             endOffset = tmpLen;
 1165           }
 1166         }
 1167   
 1168         // seek to the right offsets
 1169         if ( offset > 0 ) {
 1170           long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
 1171           blockInFile.seek(offset);
 1172           if (checksumSkip > 0) {
 1173             //Should we use seek() for checksum file as well?
 1174             FileUtil.skipFully(checksumIn, checksumSkip);
 1175           }
 1176         }
 1177         
 1178         blockIn = new DataInputStream(new BufferedInputStream(
 1179                                         new FileInputStream(blockInFile.getFD()), 
 1180                                         BUFFER_SIZE));
 1181         
 1182         if ( targets != null ) {
 1183           //
 1184           // Header info
 1185           //
 1186           out.writeShort( DATA_TRANFER_VERSION );
 1187           out.writeByte( OP_WRITE_BLOCK );
 1188           out.writeLong( block.getBlockId() );
 1189           out.writeInt(targets.length-1);
 1190           for (int i = 1; i < targets.length; i++) {
 1191             targets[i].write( out );
 1192           }
 1193         } else {
 1194           out.writeShort( OP_STATUS_SUCCESS );          
 1195         }
 1196   
 1197         checksum.writeHeader( out );
 1198         
 1199         if ( targets == null ) {
 1200           out.writeLong( offset );
 1201         }
 1202         
 1203         while ( endOffset >= offset ) {
 1204           // Write one data chunk per loop.
 1205           int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
 1206           if ( len > 0 ) {
 1207             blockIn.readFully( buf, 0, len );
 1208             totalRead += len;
 1209             
 1210             if ( checksumSize > 0 && checksumIn != null ) {
 1211               try {
 1212                 checksumIn.readFully( buf, len, checksumSize );
 1213                 totalRead += checksumSize;
 1214               } catch ( IOException e ) {
 1215                 LOG.warn( " Could not read checksum for data at offset " +
 1216                           offset + " for block " + block + " got : " + 
 1217                           StringUtils.stringifyException(e) );
 1218                 FileUtil.closeStream( checksumIn );
 1219                 checksumIn = null;
 1220                 if ( corruptChecksumOk ) {
 1221                   // Just fill the array with zeros.
 1222                   Arrays.fill( buf, len, len + checksumSize, (byte)0 );
 1223                 } else {
 1224                   throw e;
 1225                 }
 1226               }
 1227             }
 1228           }
 1229   
 1230           out.writeInt( len );
 1231           out.write( buf, 0, len + checksumSize );
 1232           
 1233           if ( offset == endOffset ) {
 1234             out.flush();
 1235             // We are not waiting for response from target.
 1236             break;
 1237           }
 1238           offset += len;
 1239         }
 1240       } finally {
 1241         FileUtil.closeStream( blockInFile );
 1242         FileUtil.closeStream( checksumIn );
 1243         FileUtil.closeStream( blockIn );
 1244         FileUtil.closeStream( out );
 1245       }
 1246       
 1247       return totalRead;
 1248     }
 1249   
 1250     /**
 1251      * Used for transferring a block of data.  This class
 1252      * sends a piece of data to another DataNode.
 1253      */
 1254     class DataTransfer implements Runnable {
 1255       DatanodeInfo targets[];
 1256       Block b;
 1257   
 1258       /**
 1259        * Connect to the first item in the target list.  Pass along the 
 1260        * entire target list, the block, and the data.
 1261        */
 1262       public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
 1263         this.targets = targets;
 1264         this.b = b;
 1265       }
 1266   
 1267       /**
 1268        * Do the deed, write the bytes
 1269        */
 1270       public void run() {
 1271         xmitsInProgress++;
 1272         Socket sock = null;
 1273         
 1274         try {
 1275           InetSocketAddress curTarget = 
 1276             createSocketAddr(targets[0].getName());
 1277           sock = new Socket();  
 1278           sock.connect(curTarget, READ_TIMEOUT);
 1279           sock.setSoTimeout(READ_TIMEOUT);
 1280           sendBlock( sock, b, 0, -1, targets );
 1281           LOG.info( "Transmitted block " + b + " to " + curTarget );
 1282   
 1283         } catch ( IOException ie ) {
 1284           LOG.warn( "Failed to transfer " + b + " to " + 
 1285                     targets[0].getName() + " got " + 
 1286                     StringUtils.stringifyException( ie ) );
 1287         } finally {
 1288           FileUtil.closeSocket(sock);
 1289           xmitsInProgress--;
 1290         }
 1291       }
 1292     }
 1293   
 1294     /**
 1295      * No matter what kind of exception we get, keep retrying to offerService().
 1296      * That's the loop that connects to the NameNode and provides basic DataNode
 1297      * functionality.
 1298      *
 1299      * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
 1300      */
 1301     public void run() {
 1302       LOG.info("In DataNode.run, data = " + data);
 1303   
 1304       // start dataXceiveServer
 1305       dataXceiveServer.start();
 1306           
 1307       while (shouldRun) {
 1308         try {
 1309           startDistributedUpgradeIfNeeded();
 1310           offerService();
 1311         } catch (Exception ex) {
 1312           LOG.error("Exception: " + StringUtils.stringifyException(ex));
 1313           if (shouldRun) {
 1314             try {
 1315               Thread.sleep(5000);
 1316             } catch (InterruptedException ie) {
 1317             }
 1318           }
 1319         }
 1320       }
 1321           
 1322       // wait for dataXceiveServer to terminate
 1323       try {
 1324         this.dataXceiveServer.join();
 1325       } catch (InterruptedException ie) {
 1326       }
 1327           
 1328       LOG.info("Finishing DataNode in: "+data);
 1329     }
 1330       
 1331     /** Start datanode daemon.
 1332      */
 1333     public static DataNode run(Configuration conf) throws IOException {
 1334       String[] dataDirs = conf.getStrings("dfs.data.dir");
 1335       DataNode dn = makeInstance(dataDirs, conf);
 1336       if (dn != null) {
 1337         dataNodeThread = new Thread(dn, "DataNode: [" +
 1338                                     StringUtils.arrayToString(dataDirs) + "]");
 1339         dataNodeThread.setDaemon(true); // needed for JUnit testing
 1340         dataNodeThread.start();
 1341       }
 1342       return dn;
 1343     }
 1344   
 1345     /** Start a single datanode daemon and wait for it to finish.
 1346      *  If this thread is specifically interrupted, it will stop waiting.
 1347      */
 1348     static DataNode createDataNode(String args[],
 1349                                    Configuration conf) throws IOException {
 1350       if (conf == null)
 1351         conf = new Configuration();
 1352       if (!parseArguments(args, conf)) {
 1353         printUsage();
 1354         return null;
 1355       }
 1356       return run(conf);
 1357     }
 1358   
 1359     void join() {
 1360       if (dataNodeThread != null) {
 1361         try {
 1362           dataNodeThread.join();
 1363         } catch (InterruptedException e) {}
 1364       }
 1365     }
 1366   
 1367     /**
 1368      * Make an instance of DataNode after ensuring that at least one of the
 1369      * given data directories (and their parent directories, if necessary)
 1370      * can be created.
 1371      * @param dataDirs List of directories, where the new DataNode instance should
 1372      * keep its files.
 1373      * @param conf Configuration instance to use.
 1374      * @return DataNode instance for given list of data dirs and conf, or null if
 1375      * no directory from this directory list can be created.
 1376      * @throws IOException
 1377      */
 1378     static DataNode makeInstance(String[] dataDirs, Configuration conf)
 1379       throws IOException {
 1380       ArrayList<File> dirs = new ArrayList<File>();
 1381       for (int i = 0; i < dataDirs.length; i++) {
 1382         File data = new File(dataDirs[i]);
 1383         try {
 1384           DiskChecker.checkDir(data);
 1385           dirs.add(data);
 1386         } catch(DiskErrorException e) {
 1387           LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
 1388         }
 1389       }
 1390       if (dirs.size() > 0) 
 1391         return new DataNode(conf, dirs);
 1392       LOG.error("All directories in dfs.data.dir are invalid.");
 1393       return null;
 1394     }
 1395   
 1396     public String toString() {
 1397       return "DataNode{" +
 1398         "data=" + data +
 1399         ", localName='" + dnRegistration.getName() + "'" +
 1400         ", storageID='" + dnRegistration.getStorageID() + "'" +
 1401         ", xmitsInProgress=" + xmitsInProgress +
 1402         "}";
 1403     }
 1404     
 1405     private static void printUsage() {
 1406       System.err.println("Usage: java DataNode");
 1407       System.err.println("           [-r, --rack <network location>] |");
 1408       System.err.println("           [-rollback]");
 1409     }
 1410   
 1411     /**
 1412      * Parse and verify command line arguments and set configuration parameters.
 1413      *
 1414      * @return false if passed argements are incorrect
 1415      */
 1416     private static boolean parseArguments(String args[], 
 1417                                           Configuration conf) {
 1418       int argsLen = (args == null) ? 0 : args.length;
 1419       StartupOption startOpt = StartupOption.REGULAR;
 1420       String networkLoc = null;
 1421       for(int i=0; i < argsLen; i++) {
 1422         String cmd = args[i];
 1423         if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
 1424           if (i==args.length-1)
 1425             return false;
 1426           networkLoc = args[++i];
 1427           if (networkLoc.startsWith("-"))
 1428             return false;
 1429         } else if ("-rollback".equalsIgnoreCase(cmd)) {
 1430           startOpt = StartupOption.ROLLBACK;
 1431         } else if ("-regular".equalsIgnoreCase(cmd)) {
 1432           startOpt = StartupOption.REGULAR;
 1433         } else
 1434           return false;
 1435       }
 1436       if (networkLoc != null)
 1437         conf.set("dfs.datanode.rack", NodeBase.normalize(networkLoc));
 1438       setStartupOption(conf, startOpt);
 1439       return true;
 1440     }
 1441   
 1442     private static void setStartupOption(Configuration conf, StartupOption opt) {
 1443       conf.set("dfs.datanode.startup", opt.toString());
 1444     }
 1445   
 1446     static StartupOption getStartupOption(Configuration conf) {
 1447       return StartupOption.valueOf(conf.get("dfs.datanode.startup",
 1448                                             StartupOption.REGULAR.toString()));
 1449     }
 1450   
 1451     /* Get the network location by running a script configured in conf */
 1452     private static String getNetworkLoc(Configuration conf) 
 1453       throws IOException {
 1454       String locScript = conf.get("dfs.network.script");
 1455       if (locScript == null) 
 1456         return NetworkTopology.DEFAULT_RACK;
 1457   
 1458       LOG.info("Starting to run script to get datanode network location");
 1459       Process p = Runtime.getRuntime().exec(locScript);
 1460       StringBuffer networkLoc = new StringBuffer();
 1461       final BufferedReader inR = new BufferedReader(
 1462                                                     new InputStreamReader(p.getInputStream()));
 1463       final BufferedReader errR = new BufferedReader(
 1464                                                      new InputStreamReader(p.getErrorStream()));
 1465   
 1466       // read & log any error messages from the running script
 1467       Thread errThread = new Thread() {
 1468           public void start() {
 1469             try {
 1470               String errLine = errR.readLine();
 1471               while(errLine != null) {
 1472                 LOG.warn("Network script error: "+errLine);
 1473                 errLine = errR.readLine();
 1474               }
 1475             } catch(IOException e) {
 1476                       
 1477             }
 1478           }
 1479         };
 1480       try {
 1481         errThread.start();
 1482               
 1483         // fetch output from the process
 1484         String line = inR.readLine();
 1485         while(line != null) {
 1486           networkLoc.append(line);
 1487           line = inR.readLine();
 1488         }
 1489         try {
 1490           // wait for the process to finish
 1491           int returnVal = p.waitFor();
 1492           // check the exit code
 1493           if (returnVal != 0) {
 1494             throw new IOException("Process exits with nonzero status: "+locScript);
 1495           }
 1496         } catch (InterruptedException e) {
 1497           throw new IOException(e.getMessage());
 1498         } finally {
 1499           try {
 1500             // make sure that the error thread exits
 1501             errThread.join();
 1502           } catch (InterruptedException je) {
 1503             LOG.warn(StringUtils.stringifyException(je));
 1504           }
 1505         }
 1506       } finally {
 1507         // close in & error streams
 1508         try {
 1509           inR.close();
 1510         } catch (IOException ine) {
 1511           throw ine;
 1512         } finally {
 1513           errR.close();
 1514         }
 1515       }
 1516   
 1517       return networkLoc.toString();
 1518     }
 1519   
 1520     /**
 1521      */
 1522     public static void main(String args[]) {
 1523       try {
 1524         StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
 1525         DataNode datanode = createDataNode(args, null);
 1526         if (datanode != null)
 1527           datanode.join();
 1528       } catch (Throwable e) {
 1529         LOG.error(StringUtils.stringifyException(e));
 1530         System.exit(-1);
 1531       }
 1532     }
 1533   
 1534   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » dfs » [javadoc | source]