本文主要阐述HDFSRPC安全认证相关的实现。主要介绍Token相关的实现。
写在前面
相关blog
https://blog.csdn.net/hncscwc/article/details/124722784
https://blog.csdn.net/hncscwc/article/details/124958357
Token由来
在探究完Kerberos,我一直在想一个问题,rpcConnection已经完成了验证,那为何还需要token?首先需要对yarn有一定的了解,我们知道mapreduce框架是把目标变成多个map,然后reduce出结果。Yarn在执行多个map、reduce的时候,是通过container来运行的。Container本质上是一个独立程序,执行了yarn分配的任务。当Container进程要去访问hdfs的时候,如果使用Kerberos,kdc验证服务存在的不可靠和性能问题(多机多container并发极高)必然会极大的限制大数据平台的稳定,尤其是当有大量用户请求需要通过kdc来获取tgt票据时。因此Token认证被引入充当kerberos的补充,在兼顾安全认证的同时,性能没有较大的损耗。在hadoop中,token主要包括DelegationToken,以及其他一些token,例如之前文件介绍过的BlockToken,以及yarn中一系列的token。
Token中yarn container流程图
Token的应用
当完成kerberos验证以后,服务主体的可以通过getDelegationToken接口来获取token。当服务主体下面的的进程需要去访问hdfs的时候,可以通过token来访问。
Token的验证也在rpc的sasl中,但是步骤跟简单,如下:
server当收到client negotiate请求以后,会返回多个auth。
auths {method: "TOKEN"mechanism: "DIGEST-MD5"protocol: ""serverId: "default"challenge: "realm=\"default\",nonce=\"svFDnzmhsk40oN5z6vnUFgYgawR17w+XvxiX1Z3M\",charset=utf-8,algorithm=md5-sess"
}
auths {method: "KERBEROS"mechanism: "GSSAPI"protocol: "root"serverId: "node17"
}
client接收完negotiate应答后,可以通过服务主体获取的token来initSaslClient,然后发送Initiate请求。Server接收到Initiate请求,会通过token初始化saslServer,不同于Kerberos,saslserver验证完token会立马complete。这时候server会直接返回success应答给客户端。客户端接收到success应答以后即完成SaslClient的初始化。
可以看出token验证的整个过程更简单,而且本质上就是server验证了一下client的token,消耗更少,性能更高。
token验证本身与用户密码生成没有任何关系,主要都是java原生类来实现。代码如下:
public class TokenTest {public static final String SASL_DEFAULT_REALM = "default";public static final String USERNAME = "tokentestuser";public static final char[] PASSWORD = new char[]{'1'};public static void main(String[] args) throws SaslException {String mechanism = "DIGEST-MD5";CallbackHandler serverCallback = new SaslDigestCallbackHandler();String protocol = "";String serverId = SASL_DEFAULT_REALM;SaslServer saslServer = FastSaslServerFactory.getInstance().createSaslServer(mechanism, protocol, serverId, null, serverCallback);String saslUser = null;Map<String, String> saslProperties = new HashMap<String, String>();saslProperties.put("javax.security.sasl.qop", "auth");saslProperties.put("javax.security.sasl.server.authentication", "true");CallbackHandler clientCallback = new SaslClientCallbackHandler();SaslClient saslClient = Sasl.createSaslClient(new String[]{mechanism}, saslUser, protocol, serverId, saslProperties, clientCallback);byte[] response = saslServer.evaluateResponse(new byte[0]);System.out.println("NEGOTIATE:" + new String(response));byte[] request = saslClient.evaluateChallenge(response);System.out.println("INITIATE:" + new String(request));byte[] response2 = saslServer.evaluateResponse(request);System.out.println("SUCCESS:" + new String(response2));System.out.println("server complete:" + saslServer.isComplete());saslClient.evaluateChallenge(response2);System.out.println("client complete:" + saslClient.isComplete());}public static class SaslDigestCallbackHandler implements CallbackHandler {@Overridepublic void handle(Callback[] callbacks) throws UnsupportedCallbackException {NameCallback nc = null;PasswordCallback pc = null;AuthorizeCallback ac = null;for (Callback callback : callbacks) {if (callback instanceof AuthorizeCallback) {ac = (AuthorizeCallback) callback;} else if (callback instanceof NameCallback) {nc = (NameCallback) callback;} else if (callback instanceof PasswordCallback) {pc = (PasswordCallback) callback;} else if (callback instanceof RealmCallback) {continue; // realm is ignored} else {throw new UnsupportedCallbackException(callback,"Unrecognized SASL DIGEST-MD5 Callback");}}if (pc != null) {pc.setPassword(PASSWORD);}if (ac != null) {String authid = ac.getAuthenticationID();String authzid = ac.getAuthorizationID();if (authid.equals(authzid)) {ac.setAuthorized(true);} else {ac.setAuthorized(false);}if (ac.isAuthorized()) {ac.setAuthorizedID(authzid);}}}}private static class SaslClientCallbackHandler implements CallbackHandler {private final String userName;private final char[] userPassword;public SaslClientCallbackHandler() {this.userName = USERNAME;this.userPassword = PASSWORD;}@Overridepublic void handle(Callback[] callbacks)throws UnsupportedCallbackException {NameCallback nc = null;PasswordCallback pc = null;RealmCallback rc = null;for (Callback callback : callbacks) {if (callback instanceof RealmChoiceCallback) {continue;} else if (callback instanceof NameCallback) {nc = (NameCallback) callback;} else if (callback instanceof PasswordCallback) {pc = (PasswordCallback) callback;} else if (callback instanceof RealmCallback) {rc = (RealmCallback) callback;} else {throw new UnsupportedCallbackException(callback,"Unrecognized SASL client callback");}}if (nc != null) {nc.setName(userName);}if (pc != null) {pc.setPassword(userPassword);}if (rc != null) {rc.setText(rc.getDefaultText());}}}
}
程序运行输出:
**NEGOTIATE:**realm=“default”,nonce=“alYJcFcQ1r8azJmG4E+9Vy4HJt7AfNyJIXhGCvcD”,charset=utf-8,algorithm=md5-sess
INITIATE:charset=utf-8,username=“tokentestuser”,realm=“default”,nonce=“alYJcFcQ1r8azJmG4E+9Vy4HJt7AfNyJIXhGCvcD”,nc=00000001,cnonce=“nA2o8sejSYExOtEt8ELnWJXob3KDHOIF2OlaxozQ”,digest-uri=“/default”,maxbuf=65536,response=e388c2b4a0f68f94607e01b033ef61b2,qop=auth
SUCCESS:rspauth=af3865533148b4f6539b785ce2958854
server complete:true
client complete:true
Client当收到server的negotiate response后,会通过某个算法生成response,然后发送initiate request。Server会通过同样的算法来生成自己的验证值来比较response,成功以后会同样的算法生成rspauth。Client收到rspauth以后,会用同样的算法来成自己的验证值来比较rspauth。
Token验证算法
protected byte[] generateResponseValue(String authMethod,String digestUriValue,String qopValue,String usernameValue,String realmValue,char[] passwdValue,byte[] nonceValue,byte[] cNonceValue,int nonceCount,byte[] authzidValue) throws NoSuchAlgorithmException,UnsupportedEncodingException,IOException {MessageDigest md5 = MessageDigest.getInstance("MD5");byte[] hexA1, hexA2;ByteArrayOutputStream A2, beginA1, A1, KD;// A2// --// A2 = { "AUTHENTICATE:", digest-uri-value,// [:00000000000000000000000000000000] } // if auth-int or auth-conf//A2 = new ByteArrayOutputStream();A2.write((authMethod + ":" + digestUriValue).getBytes(encoding));if (qopValue.equals("auth-conf") ||qopValue.equals("auth-int")) {logger.log(Level.FINE, "DIGEST04:QOP: {0}", qopValue);A2.write(SECURITY_LAYER_MARKER.getBytes(encoding));}if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST05:A2: {0}", A2.toString());}md5.update(A2.toByteArray());byte[] digest = md5.digest();hexA2 = binaryToHex(digest);if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST06:HEX(H(A2)): {0}", new String(hexA2));}// A1// --// H(user-name : realm-value : passwd)//beginA1 = new ByteArrayOutputStream();beginA1.write(stringToByte_8859_1(usernameValue));beginA1.write(':');// if no realm, realm will be an empty stringbeginA1.write(stringToByte_8859_1(realmValue));beginA1.write(':');beginA1.write(stringToByte_8859_1(new String(passwdValue)));md5.update(beginA1.toByteArray());digest = md5.digest();if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST07:H({0}) = {1}",new Object[]{beginA1.toString(), new String(binaryToHex(digest))});}// A1// --// A1 = { H ( {user-name : realm-value : passwd } ),// : nonce-value, : cnonce-value : authzid-value//A1 = new ByteArrayOutputStream();A1.write(digest);A1.write(':');A1.write(nonceValue);A1.write(':');A1.write(cNonceValue);if (authzidValue != null) {A1.write(':');A1.write(authzidValue);}md5.update(A1.toByteArray());digest = md5.digest();H_A1 = digest; // Record H(A1). Use for integrity & privacy.hexA1 = binaryToHex(digest);if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST08:H(A1) = {0}", new String(hexA1));}//// H(k, : , s);//KD = new ByteArrayOutputStream();KD.write(hexA1);KD.write(':');KD.write(nonceValue);KD.write(':');KD.write(nonceCountToHex(nonceCount).getBytes(encoding));KD.write(':');KD.write(cNonceValue);KD.write(':');KD.write(qopValue.getBytes(encoding));KD.write(':');KD.write(hexA2);if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST09:KD: {0}", KD.toString());}md5.update(KD.toByteArray());digest = md5.digest();byte[] answer = binaryToHex(digest);if (logger.isLoggable(Level.FINE)) {logger.log(Level.FINE, "DIGEST10:response-value: {0}",new String(answer));}return (answer);}
本质上是把用户名密码和一些参数,放入MD5.update之中,最终生成一个MD5值。
值得注意的是生成response和rspauth时只有第一个参数authMethod不一样,一个为
AUTHENTICATE,一个为空字符串。
digestResp.write(generateResponseValue("AUTHENTICATE",digestUri, negotiatedQop, username,negotiatedRealm, passwd, nonce, cnonce,nonceCount, authzidBytes));byte[] expected = generateResponseValue("",digestUri, negotiatedQop, username, negotiatedRealm,passwd, nonce, cnonce, nonceCount, authzidBytes);
Token的统一管理
Hadoop中Delegation Tokens的生成和验证主要依赖于HMAC机制。但是实际的实现可以自定义。主要原因是由于生成和验证都是在server端实现。Token相关的rpc接口如下:
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException;
long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException;
void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException;
一般的密码生成实现是生成用户名密码,存入数据库,然后通过查表验证。Token的实现略有不同,由于是HMAC来生成密码,所以密码是实时生成的,但是要保存HMAC的key,类似于AES256算法的key,key也不是固定的,是会变化的,所以要记录key。所以Token的持久化主要是持久化key和Token,也是通过proto格式来存,在fsimage.proto中。
message SecretManagerSection {message DelegationKey {optional uint32 id = 1;optional uint64 expiryDate = 2;optional bytes key = 3;}message PersistToken {optional uint32 version = 1;optional string owner = 2;optional string renewer = 3;optional string realUser = 4;optional uint64 issueDate = 5;optional uint64 maxDate = 6;optional uint32 sequenceNumber = 7;optional uint32 masterKeyId = 8;optional uint64 expiryDate = 9;}optional uint32 currentId = 1;optional uint32 tokenSequenceNumber = 2;optional uint32 numKeys = 3;optional uint32 numTokens = 4;// repeated DelegationKey keys// repeated PersistToken tokens
}
独立站原文