1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.apache.catalina.cluster.io;
17
18 import java.nio.ByteBuffer;
19 import java.nio.channels.Selector;
20 import java.nio.channels.SocketChannel;
21
22 import org.apache.catalina.cluster.tcp.ClusterData;
23
24 /**
25 * The object reader object is an object used in conjunction with
26 * java.nio TCP messages. This object stores the message bytes in a
27 * <code>XByteBuffer</code> until a full package has been received.
28 * When a full package has been received, the append method will call messageDataReceived
29 * on the callback object associated with this object reader.<BR>
30 * This object uses an XByteBuffer which is an extendable object buffer that also allows
31 * for message encoding and decoding.
32 *
33 * @author Filip Hanik
34 * @author Peter Rossbach
35 * @version $Revision: 304032 $, $Date: 2005-07-27 11:11:55 -0400 (Wed, 27 Jul 2005) $
36 */
37 public class ObjectReader {
38
39 private SocketChannel channel;
40
41 private Selector selector;
42
43 private ListenCallback callback;
44
45 private XByteBuffer buffer;
46
47 /**
48 * Create XByteBuffer and store parameter
49 * @param channel
50 * @param selector
51 * @param callback
52 */
53 public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback) {
54 this.channel = channel;
55 this.selector = selector;
56 this.callback = callback;
57 this.buffer = new XByteBuffer();
58 }
59
60 /**
61 * get the current SimpleTcpCluster
62 * @return Returns the callback.
63 */
64 public ListenCallback getCallback() {
65 return callback;
66 }
67
68 /**
69 * Get underlying NIO channel
70 * @return The socket
71 */
72 public SocketChannel getChannel() {
73 return this.channel;
74 }
75
76 /**
77 * Append new bytes to buffer.
78 * @see XByteBuffer#countPackages()
79 * @param data new transfer buffer
80 * @param off offset
81 * @param len length in buffer
82 * @return number of messages that sended to callback
83 * @throws java.io.IOException
84 */
85 public int append(byte[] data,int off,int len) throws java.io.IOException {
86 buffer.append(data,off,len);
87 int pkgCnt = buffer.countPackages();
88 return pkgCnt;
89 }
90
91 /**
92 * Send buffer to cluster listener (callback).
93 * Is message complete receiver send message to callback?
94 *
95 * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#messageDataReceived(ClusterData)
96 * @see XByteBuffer#doesPackageExist()
97 * @see XByteBuffer#extractPackage(boolean)
98 *
99 * @return number of received packages/messages
100 * @throws java.io.IOException
101 */
102 public int execute() throws java.io.IOException {
103 int pkgCnt = 0;
104 boolean pkgExists = buffer.doesPackageExist();
105 while ( pkgExists ) {
106 ClusterData data = buffer.extractPackage(true);
107 getCallback().messageDataReceived(data);
108 pkgCnt++;
109 pkgExists = buffer.doesPackageExist();
110 }
111 return pkgCnt;
112 }
113
114 /**
115 * Write Ack to sender
116 * @param buf
117 * @return The bytes written count
118 * @throws java.io.IOException
119 */
120 public int write(ByteBuffer buf) throws java.io.IOException {
121 return getChannel().write(buf);
122 }
123
124 }