001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.record;
020    
021    import java.io.InputStreamReader;
022    import java.io.InputStream;
023    import java.io.IOException;
024    import java.io.PushbackReader;
025    import java.io.UnsupportedEncodingException;
026    
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    
030    /**
031     * @deprecated Replaced by <a href="https://hadoop.apache.org/avro/">Avro</a>.
032     */
033    @Deprecated
034    @InterfaceAudience.Public
035    @InterfaceStability.Stable
036    public class CsvRecordInput implements RecordInput {
037        
038      private PushbackReader stream;
039        
040      private class CsvIndex implements Index {
041        @Override
042        public boolean done() {
043          char c = '\0';
044          try {
045            c = (char) stream.read();
046            stream.unread(c);
047          } catch (IOException ex) {
048          }
049          return (c == '}') ? true : false;
050        }
051        @Override
052        public void incr() {}
053      }
054        
055      private void throwExceptionOnError(String tag) throws IOException {
056        throw new IOException("Error deserializing "+tag);
057      }
058        
059      private String readField(String tag) throws IOException {
060        try {
061          StringBuilder buf = new StringBuilder();
062          while (true) {
063            char c = (char) stream.read();
064            switch (c) {
065            case ',':
066              return buf.toString();
067            case '}':
068            case '\n':
069            case '\r':
070              stream.unread(c);
071              return buf.toString();
072            default:
073              buf.append(c);
074            }
075          }
076        } catch (IOException ex) {
077          throw new IOException("Error reading "+tag);
078        }
079      }
080        
081      /** Creates a new instance of CsvRecordInput */
082      public CsvRecordInput(InputStream in) {
083        try {
084          stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
085        } catch (UnsupportedEncodingException ex) {
086          throw new RuntimeException(ex);
087        }
088      }
089        
090      @Override
091      public byte readByte(String tag) throws IOException {
092        return (byte) readLong(tag);
093      }
094        
095      @Override
096      public boolean readBool(String tag) throws IOException {
097        String sval = readField(tag);
098        return "T".equals(sval) ? true : false;
099      }
100        
101      @Override
102      public int readInt(String tag) throws IOException {
103        return (int) readLong(tag);
104      }
105        
106      @Override
107      public long readLong(String tag) throws IOException {
108        String sval = readField(tag);
109        try {
110          long lval = Long.parseLong(sval);
111          return lval;
112        } catch (NumberFormatException ex) {
113          throw new IOException("Error deserializing "+tag);
114        }
115      }
116        
117      @Override
118      public float readFloat(String tag) throws IOException {
119        return (float) readDouble(tag);
120      }
121        
122      @Override
123      public double readDouble(String tag) throws IOException {
124        String sval = readField(tag);
125        try {
126          double dval = Double.parseDouble(sval);
127          return dval;
128        } catch (NumberFormatException ex) {
129          throw new IOException("Error deserializing "+tag);
130        }
131      }
132        
133      @Override
134      public String readString(String tag) throws IOException {
135        String sval = readField(tag);
136        return Utils.fromCSVString(sval);
137      }
138        
139      @Override
140      public Buffer readBuffer(String tag) throws IOException {
141        String sval = readField(tag);
142        return Utils.fromCSVBuffer(sval);
143      }
144        
145      @Override
146      public void startRecord(String tag) throws IOException {
147        if (tag != null && !tag.isEmpty()) {
148          char c1 = (char) stream.read();
149          char c2 = (char) stream.read();
150          if (c1 != 's' || c2 != '{') {
151            throw new IOException("Error deserializing "+tag);
152          }
153        }
154      }
155        
156      @Override
157      public void endRecord(String tag) throws IOException {
158        char c = (char) stream.read();
159        if (tag == null || tag.isEmpty()) {
160          if (c != '\n' && c != '\r') {
161            throw new IOException("Error deserializing record.");
162          } else {
163            return;
164          }
165        }
166            
167        if (c != '}') {
168          throw new IOException("Error deserializing "+tag);
169        }
170        c = (char) stream.read();
171        if (c != ',') {
172          stream.unread(c);
173        }
174            
175        return;
176      }
177        
178      @Override
179      public Index startVector(String tag) throws IOException {
180        char c1 = (char) stream.read();
181        char c2 = (char) stream.read();
182        if (c1 != 'v' || c2 != '{') {
183          throw new IOException("Error deserializing "+tag);
184        }
185        return new CsvIndex();
186      }
187        
188      @Override
189      public void endVector(String tag) throws IOException {
190        char c = (char) stream.read();
191        if (c != '}') {
192          throw new IOException("Error deserializing "+tag);
193        }
194        c = (char) stream.read();
195        if (c != ',') {
196          stream.unread(c);
197        }
198        return;
199      }
200        
201      @Override
202      public Index startMap(String tag) throws IOException {
203        char c1 = (char) stream.read();
204        char c2 = (char) stream.read();
205        if (c1 != 'm' || c2 != '{') {
206          throw new IOException("Error deserializing "+tag);
207        }
208        return new CsvIndex();
209      }
210        
211      @Override
212      public void endMap(String tag) throws IOException {
213        char c = (char) stream.read();
214        if (c != '}') {
215          throw new IOException("Error deserializing "+tag);
216        }
217        c = (char) stream.read();
218        if (c != ',') {
219          stream.unread(c);
220        }
221        return;
222      }
223    }