001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.record;
020
021 import java.io.InputStreamReader;
022 import java.io.InputStream;
023 import java.io.IOException;
024 import java.io.PushbackReader;
025 import java.io.UnsupportedEncodingException;
026
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029
030 /**
031 * @deprecated Replaced by <a href="https://hadoop.apache.org/avro/">Avro</a>.
032 */
033 @Deprecated
034 @InterfaceAudience.Public
035 @InterfaceStability.Stable
036 public class CsvRecordInput implements RecordInput {
037
038 private PushbackReader stream;
039
040 private class CsvIndex implements Index {
041 @Override
042 public boolean done() {
043 char c = '\0';
044 try {
045 c = (char) stream.read();
046 stream.unread(c);
047 } catch (IOException ex) {
048 }
049 return (c == '}') ? true : false;
050 }
051 @Override
052 public void incr() {}
053 }
054
055 private void throwExceptionOnError(String tag) throws IOException {
056 throw new IOException("Error deserializing "+tag);
057 }
058
059 private String readField(String tag) throws IOException {
060 try {
061 StringBuilder buf = new StringBuilder();
062 while (true) {
063 char c = (char) stream.read();
064 switch (c) {
065 case ',':
066 return buf.toString();
067 case '}':
068 case '\n':
069 case '\r':
070 stream.unread(c);
071 return buf.toString();
072 default:
073 buf.append(c);
074 }
075 }
076 } catch (IOException ex) {
077 throw new IOException("Error reading "+tag);
078 }
079 }
080
081 /** Creates a new instance of CsvRecordInput */
082 public CsvRecordInput(InputStream in) {
083 try {
084 stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
085 } catch (UnsupportedEncodingException ex) {
086 throw new RuntimeException(ex);
087 }
088 }
089
090 @Override
091 public byte readByte(String tag) throws IOException {
092 return (byte) readLong(tag);
093 }
094
095 @Override
096 public boolean readBool(String tag) throws IOException {
097 String sval = readField(tag);
098 return "T".equals(sval) ? true : false;
099 }
100
101 @Override
102 public int readInt(String tag) throws IOException {
103 return (int) readLong(tag);
104 }
105
106 @Override
107 public long readLong(String tag) throws IOException {
108 String sval = readField(tag);
109 try {
110 long lval = Long.parseLong(sval);
111 return lval;
112 } catch (NumberFormatException ex) {
113 throw new IOException("Error deserializing "+tag);
114 }
115 }
116
117 @Override
118 public float readFloat(String tag) throws IOException {
119 return (float) readDouble(tag);
120 }
121
122 @Override
123 public double readDouble(String tag) throws IOException {
124 String sval = readField(tag);
125 try {
126 double dval = Double.parseDouble(sval);
127 return dval;
128 } catch (NumberFormatException ex) {
129 throw new IOException("Error deserializing "+tag);
130 }
131 }
132
133 @Override
134 public String readString(String tag) throws IOException {
135 String sval = readField(tag);
136 return Utils.fromCSVString(sval);
137 }
138
139 @Override
140 public Buffer readBuffer(String tag) throws IOException {
141 String sval = readField(tag);
142 return Utils.fromCSVBuffer(sval);
143 }
144
145 @Override
146 public void startRecord(String tag) throws IOException {
147 if (tag != null && !tag.isEmpty()) {
148 char c1 = (char) stream.read();
149 char c2 = (char) stream.read();
150 if (c1 != 's' || c2 != '{') {
151 throw new IOException("Error deserializing "+tag);
152 }
153 }
154 }
155
156 @Override
157 public void endRecord(String tag) throws IOException {
158 char c = (char) stream.read();
159 if (tag == null || tag.isEmpty()) {
160 if (c != '\n' && c != '\r') {
161 throw new IOException("Error deserializing record.");
162 } else {
163 return;
164 }
165 }
166
167 if (c != '}') {
168 throw new IOException("Error deserializing "+tag);
169 }
170 c = (char) stream.read();
171 if (c != ',') {
172 stream.unread(c);
173 }
174
175 return;
176 }
177
178 @Override
179 public Index startVector(String tag) throws IOException {
180 char c1 = (char) stream.read();
181 char c2 = (char) stream.read();
182 if (c1 != 'v' || c2 != '{') {
183 throw new IOException("Error deserializing "+tag);
184 }
185 return new CsvIndex();
186 }
187
188 @Override
189 public void endVector(String tag) throws IOException {
190 char c = (char) stream.read();
191 if (c != '}') {
192 throw new IOException("Error deserializing "+tag);
193 }
194 c = (char) stream.read();
195 if (c != ',') {
196 stream.unread(c);
197 }
198 return;
199 }
200
201 @Override
202 public Index startMap(String tag) throws IOException {
203 char c1 = (char) stream.read();
204 char c2 = (char) stream.read();
205 if (c1 != 'm' || c2 != '{') {
206 throw new IOException("Error deserializing "+tag);
207 }
208 return new CsvIndex();
209 }
210
211 @Override
212 public void endMap(String tag) throws IOException {
213 char c = (char) stream.read();
214 if (c != '}') {
215 throw new IOException("Error deserializing "+tag);
216 }
217 c = (char) stream.read();
218 if (c != ',') {
219 stream.unread(c);
220 }
221 return;
222 }
223 }