001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.record; 020 021import java.io.InputStreamReader; 022import java.io.InputStream; 023import java.io.IOException; 024import java.io.PushbackReader; 025import java.io.UnsupportedEncodingException; 026 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029 030/** 031 * @deprecated Replaced by <a href="https://hadoop.apache.org/avro/">Avro</a>. 032 */ 033@Deprecated 034@InterfaceAudience.Public 035@InterfaceStability.Stable 036public class CsvRecordInput implements RecordInput { 037 038 private PushbackReader stream; 039 040 private class CsvIndex implements Index { 041 @Override 042 public boolean done() { 043 char c = '\0'; 044 try { 045 c = (char) stream.read(); 046 stream.unread(c); 047 } catch (IOException ex) { 048 } 049 return (c == '}') ? true : false; 050 } 051 @Override 052 public void incr() {} 053 } 054 055 private void throwExceptionOnError(String tag) throws IOException { 056 throw new IOException("Error deserializing "+tag); 057 } 058 059 private String readField(String tag) throws IOException { 060 try { 061 StringBuilder buf = new StringBuilder(); 062 while (true) { 063 char c = (char) stream.read(); 064 switch (c) { 065 case ',': 066 return buf.toString(); 067 case '}': 068 case '\n': 069 case '\r': 070 stream.unread(c); 071 return buf.toString(); 072 default: 073 buf.append(c); 074 } 075 } 076 } catch (IOException ex) { 077 throw new IOException("Error reading "+tag); 078 } 079 } 080 081 /** Creates a new instance of CsvRecordInput */ 082 public CsvRecordInput(InputStream in) { 083 try { 084 stream = new PushbackReader(new InputStreamReader(in, "UTF-8")); 085 } catch (UnsupportedEncodingException ex) { 086 throw new RuntimeException(ex); 087 } 088 } 089 090 @Override 091 public byte readByte(String tag) throws IOException { 092 return (byte) readLong(tag); 093 } 094 095 @Override 096 public boolean readBool(String tag) throws IOException { 097 String sval = readField(tag); 098 return "T".equals(sval) ? true : false; 099 } 100 101 @Override 102 public int readInt(String tag) throws IOException { 103 return (int) readLong(tag); 104 } 105 106 @Override 107 public long readLong(String tag) throws IOException { 108 String sval = readField(tag); 109 try { 110 long lval = Long.parseLong(sval); 111 return lval; 112 } catch (NumberFormatException ex) { 113 throw new IOException("Error deserializing "+tag); 114 } 115 } 116 117 @Override 118 public float readFloat(String tag) throws IOException { 119 return (float) readDouble(tag); 120 } 121 122 @Override 123 public double readDouble(String tag) throws IOException { 124 String sval = readField(tag); 125 try { 126 double dval = Double.parseDouble(sval); 127 return dval; 128 } catch (NumberFormatException ex) { 129 throw new IOException("Error deserializing "+tag); 130 } 131 } 132 133 @Override 134 public String readString(String tag) throws IOException { 135 String sval = readField(tag); 136 return Utils.fromCSVString(sval); 137 } 138 139 @Override 140 public Buffer readBuffer(String tag) throws IOException { 141 String sval = readField(tag); 142 return Utils.fromCSVBuffer(sval); 143 } 144 145 @Override 146 public void startRecord(String tag) throws IOException { 147 if (tag != null && !tag.isEmpty()) { 148 char c1 = (char) stream.read(); 149 char c2 = (char) stream.read(); 150 if (c1 != 's' || c2 != '{') { 151 throw new IOException("Error deserializing "+tag); 152 } 153 } 154 } 155 156 @Override 157 public void endRecord(String tag) throws IOException { 158 char c = (char) stream.read(); 159 if (tag == null || tag.isEmpty()) { 160 if (c != '\n' && c != '\r') { 161 throw new IOException("Error deserializing record."); 162 } else { 163 return; 164 } 165 } 166 167 if (c != '}') { 168 throw new IOException("Error deserializing "+tag); 169 } 170 c = (char) stream.read(); 171 if (c != ',') { 172 stream.unread(c); 173 } 174 175 return; 176 } 177 178 @Override 179 public Index startVector(String tag) throws IOException { 180 char c1 = (char) stream.read(); 181 char c2 = (char) stream.read(); 182 if (c1 != 'v' || c2 != '{') { 183 throw new IOException("Error deserializing "+tag); 184 } 185 return new CsvIndex(); 186 } 187 188 @Override 189 public void endVector(String tag) throws IOException { 190 char c = (char) stream.read(); 191 if (c != '}') { 192 throw new IOException("Error deserializing "+tag); 193 } 194 c = (char) stream.read(); 195 if (c != ',') { 196 stream.unread(c); 197 } 198 return; 199 } 200 201 @Override 202 public Index startMap(String tag) throws IOException { 203 char c1 = (char) stream.read(); 204 char c2 = (char) stream.read(); 205 if (c1 != 'm' || c2 != '{') { 206 throw new IOException("Error deserializing "+tag); 207 } 208 return new CsvIndex(); 209 } 210 211 @Override 212 public void endMap(String tag) throws IOException { 213 char c = (char) stream.read(); 214 if (c != '}') { 215 throw new IOException("Error deserializing "+tag); 216 } 217 c = (char) stream.read(); 218 if (c != ',') { 219 stream.unread(c); 220 } 221 return; 222 } 223}