001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.util; 020 021import java.io.DataInput; 022import java.io.IOException; 023 024import org.apache.hadoop.ipc.CallerContext; 025import org.apache.hadoop.ipc.RPC; 026import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; 027import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto; 028import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*; 029import org.apache.hadoop.security.SaslRpcServer.AuthMethod; 030import org.apache.hadoop.security.UserGroupInformation; 031import org.apache.htrace.core.Span; 032import org.apache.htrace.core.Tracer; 033 034import com.google.protobuf.ByteString; 035 036public abstract class ProtoUtil { 037 038 /** 039 * Read a variable length integer in the same format that ProtoBufs encodes. 040 * @param in the input stream to read from 041 * @return the integer 042 * @throws IOException if it is malformed or EOF. 043 */ 044 public static int readRawVarint32(DataInput in) throws IOException { 045 byte tmp = in.readByte(); 046 if (tmp >= 0) { 047 return tmp; 048 } 049 int result = tmp & 0x7f; 050 if ((tmp = in.readByte()) >= 0) { 051 result |= tmp << 7; 052 } else { 053 result |= (tmp & 0x7f) << 7; 054 if ((tmp = in.readByte()) >= 0) { 055 result |= tmp << 14; 056 } else { 057 result |= (tmp & 0x7f) << 14; 058 if ((tmp = in.readByte()) >= 0) { 059 result |= tmp << 21; 060 } else { 061 result |= (tmp & 0x7f) << 21; 062 result |= (tmp = in.readByte()) << 28; 063 if (tmp < 0) { 064 // Discard upper 32 bits. 065 for (int i = 0; i < 5; i++) { 066 if (in.readByte() >= 0) { 067 return result; 068 } 069 } 070 throw new IOException("Malformed varint"); 071 } 072 } 073 } 074 } 075 return result; 076 } 077 078 079 /** 080 * This method creates the connection context using exactly the same logic 081 * as the old connection context as was done for writable where 082 * the effective and real users are set based on the auth method. 083 * 084 */ 085 public static IpcConnectionContextProto makeIpcConnectionContext( 086 final String protocol, 087 final UserGroupInformation ugi, final AuthMethod authMethod) { 088 IpcConnectionContextProto.Builder result = IpcConnectionContextProto.newBuilder(); 089 if (protocol != null) { 090 result.setProtocol(protocol); 091 } 092 UserInformationProto.Builder ugiProto = UserInformationProto.newBuilder(); 093 if (ugi != null) { 094 /* 095 * In the connection context we send only additional user info that 096 * is not derived from the authentication done during connection setup. 097 */ 098 if (authMethod == AuthMethod.KERBEROS) { 099 // Real user was established as part of the connection. 100 // Send effective user only. 101 ugiProto.setEffectiveUser(ugi.getUserName()); 102 } else if (authMethod == AuthMethod.TOKEN) { 103 // With token, the connection itself establishes 104 // both real and effective user. Hence send none in header. 105 } else { // Simple authentication 106 // No user info is established as part of the connection. 107 // Send both effective user and real user 108 ugiProto.setEffectiveUser(ugi.getUserName()); 109 if (ugi.getRealUser() != null) { 110 ugiProto.setRealUser(ugi.getRealUser().getUserName()); 111 } 112 } 113 } 114 result.setUserInfo(ugiProto); 115 return result.build(); 116 } 117 118 public static UserGroupInformation getUgi(IpcConnectionContextProto context) { 119 if (context.hasUserInfo()) { 120 UserInformationProto userInfo = context.getUserInfo(); 121 return getUgi(userInfo); 122 } else { 123 return null; 124 } 125 } 126 127 public static UserGroupInformation getUgi(UserInformationProto userInfo) { 128 UserGroupInformation ugi = null; 129 String effectiveUser = userInfo.hasEffectiveUser() ? userInfo 130 .getEffectiveUser() : null; 131 String realUser = userInfo.hasRealUser() ? userInfo.getRealUser() : null; 132 if (effectiveUser != null) { 133 if (realUser != null) { 134 UserGroupInformation realUserUgi = UserGroupInformation 135 .createRemoteUser(realUser); 136 ugi = UserGroupInformation 137 .createProxyUser(effectiveUser, realUserUgi); 138 } else { 139 ugi = org.apache.hadoop.security.UserGroupInformation 140 .createRemoteUser(effectiveUser); 141 } 142 } 143 return ugi; 144 } 145 146 static RpcKindProto convert(RPC.RpcKind kind) { 147 switch (kind) { 148 case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; 149 case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE; 150 case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; 151 } 152 return null; 153 } 154 155 156 public static RPC.RpcKind convert( RpcKindProto kind) { 157 switch (kind) { 158 case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; 159 case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE; 160 case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; 161 } 162 return null; 163 } 164 165 public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, 166 RpcRequestHeaderProto.OperationProto operation, int callId, 167 int retryCount, byte[] uuid) { 168 RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); 169 result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) 170 .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); 171 172 // Add tracing info if we are currently tracing. 173 Span span = Tracer.getCurrentSpan(); 174 if (span != null) { 175 result.setTraceInfo(RPCTraceInfoProto.newBuilder() 176 .setTraceId(span.getSpanId().getHigh()) 177 .setParentId(span.getSpanId().getLow()) 178 .build()); 179 } 180 181 // Add caller context if it is not null 182 CallerContext callerContext = CallerContext.getCurrent(); 183 if (callerContext != null && callerContext.isContextValid()) { 184 RPCCallerContextProto.Builder contextBuilder = RPCCallerContextProto 185 .newBuilder().setContext(callerContext.getContext()); 186 if (callerContext.getSignature() != null) { 187 contextBuilder.setSignature( 188 ByteString.copyFrom(callerContext.getSignature())); 189 } 190 result.setCallerContext(contextBuilder); 191 } 192 193 return result.build(); 194 } 195}