zk源码—1.数据节点与Watcher机制及权限二
yuyutoo 2025-04-30 21:00 3 浏览 0 评论
大纲
1.ZooKeeper的数据模型、节点类型与应用
(1)数据模型之树形结构
(2)节点类型与特性(持久 + 临时 + 顺序 )
(3)节点的状态结构(各种zxid + 各种version)
(4)节点的版本(version + cversion + aversion)
(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)
2.发布订阅模式:用Watcher机制实现分布式通知
(1)Watcher机制是如何实现的
(2)Watcher机制的底层原理
(3)客户端Watcher注册实现过程
(4)服务端处理Watcher过程
(5)服务端Watch事件的触发过程
(6)客户端回调Watcher的处理过程
(7)利用Watcher实现发布订阅
(8)Watcher具有的特性
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
2.发布订阅模式:用Watcher机制实现分布式通知
(4)服务端处理Watcher过程
zk服务端处理Watcher事件基本有两个过程:
一.判断收到的请求是否需要注册Watcher事件
二.将对应的Watcher事件存储到WatchManager
以下是zk服务端处理Watcher的序列图:
zk服务端接收到客户端请求后的具体处理:
一.当服务端收到客户端标记了Watcher事件的getData请求时,会调用到FinalRequestProcessor的processRequest()方法,判断当前客户端请求是否需要注册Watcher事件。
二.当getDataRequest.getWatch()的值为true时,则表明当前客户端请求需要进行Watcher注册。
三.然后将当前的ServerCnxn对象(即Watcher事件)和数据节点路径,传入到zks.getZKDatabase()的getData()方法中来实现Watcher事件的注册,也就是实现存储Watcher事件到WatchManager中。具体就是:调用DataTree.dataWatches这个WatchManager的addWatch()方法,将该客户端请求的Watcher事件(也就是ServerCnxn对象)存储到DataTree.dataWatches这个WatchManager的两个HashMap(watchTable和watch2Paths)中。
补充说明:
首先,ServerCnxn对象代表了一个客户端和服务端的连接。ServerCnxn接口的默认实现是NIOServerCnxn,也可以选NettyServerCnxn。由于NIOServerCnxn和NettyServerCnxn都实现了Watcher的process接口,所以可以把ServerCnxn对象看作是一个Watcher对象。
然后,zk服务端的数据库DataTree中会有两个WatchManager,分别是dataWatches和childWatches,分别对应节点和子节点数据变更。
接着,WatchManager中有两个HashMap:watch2Paths和watchTable。当前的ServerCnxn对象和数据节点路径最终会被存储在这两HashMap中。watchTable可以根据数据节点路径来查找对应的Watcher,watch2Paths可以根据Watcher来查找对应的数据节点路径。
同时,WatchManager除了负责添加Watcher事件,还负责触发Watcher事件,以及移除那些已经被触发的Watcher事件。
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
...
public void processRequest(Request request) {
...
ServerCnxn cnxn = request.cnxn;
...
switch (request.type) {
...
case OpCode.getData: {
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
...
}
case OpCode.getChildren: {
...
List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
...
}
}
...
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private ZKDatabase zkDb;
private FileTxnSnapLog txnLogFactory = null;
...
public ZKDatabase getZKDatabase() {
return this.zkDb;
}
...
}
public class ZKDatabase {
protected DataTree dataTree;
protected FileTxnSnapLog snapLog;
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
return dataTree.getData(path, stat, watcher);
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
return dataTree.getChildren(path, stat, watcher);
}
...
}
public class DataTree {
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
return new ArrayList<String>(n.getChildren());
}
}
...
}
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
...
synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
...
}
(5)服务端Watch事件的触发过程
对于标记了Watcher注册的请求,zk会将其对应的ServerCnxn对象(Watcher事件)存储到DataTree里的WatchManager的HashMap(watchTable和watch2Paths)中。之后,当服务端对指定节点进行数据更新后,会通过调用DataTree里的WatchManager的triggerWatch()方法来触发Watcher。
无论是触发DataTree的dataWatches,还是触发DataTree的childWatches,Watcher的触发逻辑都是一样的。
具体的Watcher触发逻辑如下:
步骤一:首先封装一个具有这三个属性的WatchedEvent对象:通知状态(KeeperState)、事件类型(EventType)、数据节点路径(path)。
步骤二:然后根据数据节点路径从DateTree的WatchManager中取出Watcher。如果为空,则说明没有任何客户端在该数据节点上注册过Watcher。如果存在,则将Watcher事件添加到自定义的Wathcers集合中,并且从DataTree的WatchManager的watchTable和watch2Paths中移除。最后调用Watcher的process()方法向客户端发送通知。
public class DataTree {
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public Stat setData(String path, byte data[], int version, long zxid, long time) {
Stat s = new Stat();
DataNode n = nodes.get(path);
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
...
}
class WatchManager {
...
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
...
}
具体的Watcher的process()方法,会由NIOServerCnxn来实现。Watcher的process()方法的具体逻辑如下:
步骤一:标记响应头ReplyHeader的xid为-1,表示当前响应是一个通知。
步骤二:将触发WatchManager.triggerWatch()方法时封装的WatchedEvent,包装成WatcherEvent,以便进行网络传输序列化。
步骤三:向客户端发送响应。
public interface Watcher {
abstract public void process(WatchedEvent event);
...
}
public abstract class ServerCnxn implements Stats, Watcher {
...
public abstract void process(WatchedEvent event);
}
public class NIOServerCnxn extends ServerCnxn {
...
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
}
(6)客户端回调Watcher的处理过程
对于来自服务端的响应:客户端会使用SendThread的readResponse()方法来进行统一处理。如果反序列化后得到的响应头replyHdr的xid为-1,则表明这是一个通知类型的响应。
SendThread接收事件通知的处理步骤如下:
步骤一:反序列化成WatcherEvent对象
zk客户端接收到请求后,首先将字节流反序列化成WatcherEvent对象。
步骤二:处理chrootPath
如果客户端设置了chrootPath为/app,而服务端响应的节点路径为/app/a,那么经过chrootPath处理后,就会统一变成一个相对路径:/a。
步骤三:还原成WatchedEvent对象
将WatcherEvent对象转换成WatchedEvent对象。
步骤四:回调Watcher
通过调用EventThread的queueEvent()方法,将WatchedEvent对象交给EventThread线程来回调Watcher。所以服务端的Watcher事件通知,最终会交给EventThread线程来处理。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
}
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
...
class SendThread extends ZooKeeperThread {
...
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
...
while (state.isAlive()) {
...
//将outgoingQueue里的请求发送出去 + 处理接收到的响应
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
...
}
}
void readResponse(ByteBuffer incomingBuffer) {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
...
//处理事务回调
if (replyHdr.getXid() == -1) {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else
...
}
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
return;
}
...
finishPacket(packet);
}
...
}
}
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
...
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {
...
doIO(pendingQueue, cnxn);
...
}
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {
SocketChannel sock = (SocketChannel) sockKey.channel();
...
//处理接收响应
if (sockKey.isReadable()) {
...
sendThread.readResponse(incomingBuffer);
...
}
//处理发送请求
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
p.createBB();
sock.write(p.bb);
outgoingQueue.removeFirstOccurrence(p);
pendingQueue.add(p);
...
}
...
}
}
EventThread线程是zk客户端专门用来处理服务端事件通知的线程。
EventThread处理事件通知的步骤如下:
步骤一:EventThread的queueEvent()方法首先会根据WatchedEvent对象,从ZKWatchManager中取出所有注册过的客户端Watcher。
步骤二:然后从ZKWatchManager的管理中删除这些Watcher。这也说明客户端的Watcher机制是一次性的,触发后就会失效。
步骤三:接着将所有获取到的Watcher放入waitingEvents队列中。
步骤四:最后EventThread线程的run()方法,通过循环的方式,每次都会从waitingEvents队列中取出一个Watcher进行串行同步处理。也就是调用EventThread线程的processEvent()方法来最终执行实现了Watcher接口的process()方法,从而实现回调处理。
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
private final ClientWatchManager watcher;
...
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
//通过以下这两个变量来实现waitingEvents为空时,加入的Watcher要马上执行,而不用等待run()方法
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
...
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
//对WatchedEvent对象进行处理,从ZKWatchManager的管理中删除这些Watcher
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
//将获取到的所有Watcher放入waitingEvents队列
waitingEvents.add(pair);
}
...
public void run() {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
}
private void processEvent(Object event) {
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
watcher.process(pair.event);
}
}
...
}
...
}
}
public class ZooKeeper implements AutoCloseable {
...
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
...
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
...
}
return result;
}
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
...
}
...
}
总结zk的Watcher机制处理过程:
一.zk是通过在客户端和服务端创建观察者信息列表来实现Watcher机制的。
二.客户端调用getData()、getChildren()、exist()等方法时,会将Watcher事件放到本地的ZKWatchManager中进行管理。
三.服务端在接收到客户端的请求后首先判断是否需要注册Watcher,若是则将ServerCnxn对象当成Watcher事件放入DataTree的WatchManager中。
四.服务端触发Watcher事件时,会根据节点路径从WatchManager中取出对应的Watcher,然后发送通知类型的响应给客户端。
五.客户端在接收到通知类型的响应后,首先通过SendThread线程提取出WatchedEvent对象。然后将WatchedEvent对象交给EventThread线程来回调Watcher。也就是查询本地的ZKWatchManager获得对应的Watcher事件,删除ZKWatchManager的Watcher并将Watcher放入waitingEvents队列。后续EventThread线程便会在其run()方法中串行出队waitingEvents,执行Watcher的process()回调。
客户端的Watcher管理器是ZKWatchManager。
服务端的Watcher管理器是WatchManager。
以上的处理设计实现了一个分布式环境下的观察者模式,通过将客户端和服务端处理Watcher事件时所需要的信息分别保存在两端,减少了彼此通信的内容,大大提升了服务的处理性能。
(7)利用Watcher实现发布订阅
一.发布订阅系统一般有推模式和拉模式
推模式是指服务端主动将数据更新发送给所有订阅的客户端,拉模式是指客户端主动发起请求来获取最新数据(定时轮询拉取)。
二.zk采用了推拉相结合来实现发布和订阅功能
首先客户端需要向服务端注册自己关注的节点(添加Watcher事件)。一旦该节点发生变更,服务端就会向客户端发送Watcher事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。
如果将配置信息放到zk上进行集中管理,那么应用启动时需要主动到zk服务端获取配置信息,然后在指定节点上注册一个Watcher监听。接着只要配置信息发生变更,zk服务端就会实时通知所有订阅的应用。从而让应用能实时获取到所订阅的配置信息节点已发生变更了的消息。
注意:原生zk客户端可以通过getData()、exists()、getChildren()三个方法,向zk服务端注册Watcher监听。而且注册到Watcher监听具有一次性,所以zk客户端获得服务端的节点变更通知后需要再次注册Watcher。
三.使用zk来实现发布订阅功能总结
步骤一:将配置信息存储到zk的节点上。
步骤二:应用启动时先从zk节点上获取配置信息,然后再向该zk节点注册一个数据变更的Watcher监听。一旦该zk节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到zk服务端发过来的数据变更通知后重新获取最新数据。
(8)Watcher具有的特性
一.一次性
无论是客户端还是服务端,一旦Watcher被触发或者回调,zk都会将其移除,所以使用zk的Watcher时需要反复注册。这样的设计能够有效减轻服务端的压力。否则,如果一个Watcher注册后一直有效,那么频繁更新的节点就会频繁发送通知给客户端,这样就会影响网络性能和服务端性能。
二.客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序。注意不要因一个Watcher的处理逻辑而影响整个客户端的Watcher回调。
三.轻量
WatchedEvent是Watcher机制的最小通知单元,WatchedEvent只包含三部分内容:通知状态、事件类型和节点路径。所以Watcher通知非常简单,只告诉客户端发生的事件,不包含具体内容。所以原始数据和变更后的数据无法从WatchedEvent中获取,需要客户端主动重新去获取数据。
客户端向服务端注册Watcher时:不会把客户端真实的Watcher对象传递到服务端,只会在客户端请求中使用boolean属性来标记Watcher对象,服务端也只会保存当前连接的ServerCnxn对象。
这种轻量的Watcher设计机制,在网络开销和服务端内存开销上都是很低的。
(9)总结
有一个问题:当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?
答案是否定的,通过对zk内部实现机制的解析可以知道:Watcher事件的触发机制取决于会话的连接状态和客户端注册事件的类型,当客户端会话状态或数据节点发生改变时,都会触发对应的Watcher事件。Watcher具有一次性,曾经的监控要重新监控。
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
前面介绍完了数据模型、Watcher监控机制,并实现了在分布式环境中经常用到的分布式锁、配置管理等功能,这些功能的本质都在于操作数据节点。如果作为分布式锁或配置项的数据节点被错误删除或修改,那么对整个分布式系统有很大的影响,甚至会造成严重的生产事故。所以zk提供了一个很好的解决方案,那就是ACL权限控制。
(1)ACL的使用(scheme:id:permission)
如何使用zk的ACL机制来实现客户端对数据节点的访问控制。一个ACL权限设置通常可以分为3部分,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission),最终组成的一条ACL请求信息格式为"scheme:id:permission"。
一.权限模式:Scheme
权限模式就是用来设置zk服务器进行权限验证的方式。zk的权限验证方式大体分为两种类型:一种是范围验证,另外一种是口令验证。但具体来分,则有4种权限模式:
模式一:所谓的范围验证就是zk可针对一个IP或一段IP授予某种权限
比如通过"ip:192.168.0.11"让某机器对服务器的一数据节点具有写入权限,或者也可以通过"ip:192.168.0.11/22"给一段IP地址的机器赋权。
模式二:另一种权限模式就是口令验证,也可以理解为用户名密码的方式
在zk中这种验证方式是Digest认证。即在向客户端传送"username:password"这种形式的权限表示符时,服务端会对密码部分使用SHA-1和BASE64算法进行加密以保证安全。
模式三:还有一种权限模式Super可以认为是一种特殊的Digest认证
具有Super权限的客户端可以对zk上的任意数据节点进行任意操作。
模式四:最后一种授权模式是world模式
其实这种授权模式对应于系统中的所有用户,本质上起不到任何作用,设置了world权限模式系统中的所有用户操作都可以不进行权限验证。
下面代码给出了Digest模式下客户端的调用方式:
create /digest_node1
setAcl /digest_node1 digest:用户名:base64格式密码:rwadc
getAcl /digest_node1
addauth digest user:passwd
二.授权对象:ID
所谓的授权对象就是要把权限赋予谁,对应于4种不同的权限模式来说:
如果使用IP方式,那么授权对象可以是一个IP地址或IP地址段
如果使用Digest或Super方式,那么授权对象对应于一个用户名
如果使用World模式,那么就是授权系统中所有的用户
三.权限信息:Permission
权限就是指可以在数据节点上执行的操作种类,zk定义好的权限有5种:
数据节点(Create)创建权限,可以在该数据节点下创建子节点
数据节点(Wirte)更新权限,可以更新该数据节点
数据节点(Read)读取权限,可以读取该数据节点内容以及子节点信息
数据节点(Delete)删除权限,可以删除该数据节点的子节点
数据节点(Admin)管理者权限,可以对该数据节点体进行ACL权限设置
需要注意的是:每个节点都有维护自身的ACL权限数据,即使是该节点的子节点也有自己的ACL权限而不是直接继承其父节点权限。所以如果客户端只配置"/Config"节点的读取权限,该客户端是没有其子节点的"/Config/dataBase"的读取权限的。
(2)实现自己的权限控制
虽然zk自身的权限控制机制已经做得很细,但是zk还提供了一种权限扩展机制来让用户实现自己的权限控制方式。官方文档对这种机制的定义是"Pluggable ZooKeeper Authenication",意思是可插拔的授权机制,从名称上可看出它的灵活性。
那么这种机制是如何实现的呢?首先,要想实现自定义的权限控制机制,最核心的一点是实现zk提供的权限控制器接口AuthenticationProvider。然后,实现了自定义权限后,如何才能让服务端使用自定义的权限验证方式呢?接下来就需要将自定义的权限控制注册到服务端,而注册的方式有两种:第一种是通过设置系统属性来注册自定义的权限控制器,第二种是在配置文件zoo.cfg中进行配置。
//第一种注册方式
-Dzookeeper.authProvider.x=CustomAuthenticationProvider
//第二种方式
authProvider.x=CustomAuthenticationProvider
(3)ACL内部实现原理之客户端处理过程
下面深入到底层介绍zk是如何实现ACL权限控制机制的,先看一下客户端是如何操作的,以节点授权addAuth接口为例。
步骤一:客户端会通过ClientCnxn的addAuthInfo()方法,向服务端发送请求。
步骤二:addAuthInfo()方法会将scheme和auth封装成AuthPacket对象,并封装一个表示权限操作请求的RequestHeader对象。
步骤三:接着AuthPacket对象和RequestHeader对象会被封装到Packet对象中,最后会将该Packet对象添加到outgoingQueue队列,发送给服务端。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final HostProvider hostProvider;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
public void addAuthInfo(String scheme, byte auth[]) {
cnxn.addAuthInfo(scheme, auth);
}
...
}
public class ClientCnxn {
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
volatile States state = States.NOT_CONNECTED;
...
public void addAuthInfo(String scheme, byte auth[]) {
if (!state.isAlive()) {
return;
}
authInfo.add(new AuthData(scheme, auth));
queuePacket(
new RequestHeader(-4, OpCode.auth), null,
new AuthPacket(0, scheme, auth),
null, null, null, null, null, null
);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
...
}
ACL权限控制机制的客户端实现相对简单,只是封装请求类型为权限请求,方便服务器识别处理,而发送到服务器的信息包括前面提到的权限校验信息。
(4)ACL内部实现原理之服务端实现过程
相比于客户端的处理过程,服务器端对ACL内部实现就比较复杂。
一.当客户端发出的节点ACL授权请求到达服务端后
步骤一:首先调用NIOServerCnxn.readRequest()方法作为服务端处理的入口,而readRequest()方法其内部只是调用processPacket()方法。
public class NIOServerCnxn extends ServerCnxn {
private final ZooKeeperServer zkServer;
private ByteBuffer incomingBuffer = lenBuffer;
...
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
...
}
步骤二:然后在ZooKeeperServer的processPacket()方法的内部,首先反序列化客户端的请求信息并封装到AuthPacket对象中,之后通过ProviderRegistry的getProvider()方法根据scheme判断具体实现类。
以Digest模式为例,该实现类是
DigestAuthenticationProvider,此时就会调用handleAuthentication方法进行权限验证。如果返回KeeperException.Code.OK则表示该请求已经通过了权限验证,如果返回的状态是其他或者抛出异常则表示权限验证失败。
所以权限认证的最终实现方法是handleAuthentication()方法,该方法的工作是解析客户端传递的权限验证类型,并通过addAuthInfo()方法将权限信息添加到authInfo集合中。
其中addAuthInfo()方法的作用是将解析到的权限信息存储到zk服务端的内存中,这些权限信息在整个会话存活期间会一直保存在zk服务端。如果会话关闭,那么权限信息就会被删除,这个特性类似于数据节点中的临时节点。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
...
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
...
}
return;
} else {
...
}
cnxn.incrOutstandingRequests(h);
}
...
}
public class DigestAuthenticationProvider implements AuthenticationProvider {
...
public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
String id = new String(authData);
try {
String digest = generateDigest(id);
if (digest.equals(superDigest)) {
cnxn.addAuthInfo(new Id("super", ""));
}
cnxn.addAuthInfo(new Id(getScheme(), digest));
return KeeperException.Code.OK;
} catch (NoSuchAlgorithmException e) {
LOG.error("Missing algorithm",e);
}
return KeeperException.Code.AUTHFAILED;
}
...
}
public abstract class ServerCnxn implements Stats, Watcher {
protected ArrayList<Id> authInfo = new ArrayList<Id>();
...
public void addAuthInfo(Id id) {
//将权限信息添加到authInfo集合
if (authInfo.contains(id) == false) {
authInfo.add(id);
}
}
}
二.当服务端已将客户端ACL授权请求解析并将对应的会话权限信息存储好后
服务端处理一次请求时,是如何进行权限验证的?
首先通过PrepRequestProcessor中的checkAcl()方法检查对应的请求权限。如果该节点没有任何权限设置则直接返回,如果该节点有权限设置则循环遍历节点的权限信息进行检查,如果具有相应的权限则直接返回表明权限认证成功,否则直接抛出NoAuthException异常表明权限认证失败。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
...
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
...
case OpCode.delete:
...
checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);
...
case OpCode.setData:
...
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
...
...
}
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm, List<Id> ids) {
...
for (ACL a : acl) {
if (authId.getScheme().equals(id.getScheme()) && ap.matches(authId.getId(), id.getId())) {
return;
}
}
throw new KeeperException.NoAuthException();
}
...
}
(5)ACL权限总结
客户端发送ACL权限请求时的处理:首先会封装请求类型,然后将权限信息封装到request中,最后发送request给服务端。
服务器对ACL权限请求的授权处理:首先分析请求类型是否是权限相关操作,然后根据不同的权限模式调用不同的实现类验证权限,最后存储权限信息。
注意:会话的授权信息存储在服务端内存。如果客户端会话关闭,授权信息会被删除。下次连接服务器后,需要重新调用授权接口进行授权;
zk作为分布式系统协调框架,往往在一个分布式系统下起到关键的作用。尤其是在分布式锁、配置管理等应用场景中。如果因错误操作对重要数据节点进行变更或删除,对整个系统影响很大,甚至可能会导致整个分布式服务不可用,所以设计使用zk时一定要考虑对关键节点添加权限控制。
问题:如果一个客户端对服务器上的一个节点设置了只有它自己才能操作的权限,那么等该客户端下线后,对其创建的节点要想进行修改应该怎么做?
可以通过"super模式"删除该节点或变更该节点的权限验证方式,正因为"super模式"有如此大的权限,在平时使用时应更加谨慎。
相关推荐
- ETCD 故障恢复(etc常见故障)
-
概述Kubernetes集群外部ETCD节点故障,导致kube-apiserver无法启动。...
- 在Ubuntu 16.04 LTS服务器上安装FreeRADIUS和Daloradius的方法
-
FreeRADIUS为AAARadiusLinux下开源解决方案,DaloRadius为图形化web管理工具。...
- 如何排查服务器被黑客入侵的迹象(黑客 抓取服务器数据)
-
---排查服务器是否被黑客入侵需要系统性地检查多个关键点,以下是一份详细的排查指南,包含具体命令、工具和应对策略:---###**一、快速初步检查**####1.**检查异常登录记录**...
- 使用 Fail Ban 日志分析 SSH 攻击行为
-
通过分析`fail2ban`日志可以识别和应对SSH暴力破解等攻击行为。以下是详细的操作流程和关键分析方法:---###**一、Fail2ban日志位置**Fail2ban的日志路径因系统配置...
- 《5 个实用技巧,提升你的服务器安全性,避免被黑客盯上!》
-
服务器的安全性至关重要,特别是在如今网络攻击频繁的情况下。如果你的服务器存在漏洞,黑客可能会利用这些漏洞进行攻击,甚至窃取数据。今天我们就来聊聊5个实用技巧,帮助你提升服务器的安全性,让你的系统更...
- 聊聊Spring AI Alibaba的YuQueDocumentReader
-
序本文主要研究一下SpringAIAlibaba的YuQueDocumentReaderYuQueDocumentReader...
- Mac Docker环境,利用Canal实现MySQL同步ES
-
Canal的使用使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中,并在springboo...
- RustDesk:开源远程控制工具的技术架构与全场景部署实战
-
一、开源远程控制领域的革新者1.1行业痛点与解决方案...
- 长安汽车一代CS75Plus2020款安装高德地图7.5
-
不用破解原车机,一代CS75Plus2020款,安装车机版高德地图7.5,有红绿灯读秒!废话不多讲,安装步骤如下:一、在拨号状态输入:在电话拨号界面,输入:*#518200#*(进入安卓设置界面,...
- Zookeeper使用详解之常见操作篇(zookeeper ui)
-
一、Zookeeper的数据结构对于ZooKeeper而言,其存储结构类似于文件系统,也是一个树形目录服务,并通过Key-Value键值对的形式进行数据存储。其中,Key由斜线间隔的路径元素构成。对...
- zk源码—4.会话的实现原理一(会话层的基本功能是什么)
-
大纲1.创建会话...
- Zookeeper 可观测性最佳实践(zookeeper能够确保)
-
Zookeeper介绍ZooKeeper是一个开源的分布式协调服务,用于管理和协调分布式系统中的节点。它提供了一种高效、可靠的方式来解决分布式系统中的常见问题,如数据同步、配置管理、命名服务和集群...
- 服务器密码错误被锁定怎么解决(服务器密码错几次锁)
-
#服务器密码错误被锁定解决方案当服务器因多次密码错误导致账户被锁定时,可以按照以下步骤进行排查和解决:##一、确认锁定状态###1.检查账户锁定状态(Linux)```bash#查看账户锁定...
- zk基础—4.zk实现分布式功能(分布式zk的使用)
-
大纲1.zk实现数据发布订阅...
- 《死神魂魄觉醒》卡死问题终极解决方案:从原理到实战的深度解析
-
在《死神魂魄觉醒》的斩魄刀交锋中,游戏卡死犹如突现的虚圈屏障,阻断玩家与尸魂界的连接。本文将从技术架构、解决方案、预防策略三个维度,深度剖析卡死问题的成因与应对之策,助力玩家突破次元壁障,畅享灵魂共鸣...
你 发表评论:
欢迎- 一周热门
-
-
前端面试:iframe 的优缺点? iframe有那些缺点
-
带斜线的表头制作好了,如何填充内容?这几种方法你更喜欢哪个?
-
漫学笔记之PHP.ini常用的配置信息
-
推荐7个模板代码和其他游戏源码下载的网址
-
其实模版网站在开发工作中很重要,推荐几个参考站给大家
-
[干货] JAVA - JVM - 2 内存两分 [干货]+java+-+jvm+-+2+内存两分吗
-
正在学习使用python搭建自动化测试框架?这个系统包你可能会用到
-
织梦(Dedecms)建站教程 织梦建站详细步骤
-
【开源分享】2024PHP在线客服系统源码(搭建教程+终身使用)
-
2024PHP在线客服系统源码+完全开源 带详细搭建教程
-
- 最近发表
-
- ETCD 故障恢复(etc常见故障)
- 在Ubuntu 16.04 LTS服务器上安装FreeRADIUS和Daloradius的方法
- 如何排查服务器被黑客入侵的迹象(黑客 抓取服务器数据)
- 使用 Fail Ban 日志分析 SSH 攻击行为
- 《5 个实用技巧,提升你的服务器安全性,避免被黑客盯上!》
- 聊聊Spring AI Alibaba的YuQueDocumentReader
- Mac Docker环境,利用Canal实现MySQL同步ES
- RustDesk:开源远程控制工具的技术架构与全场景部署实战
- 长安汽车一代CS75Plus2020款安装高德地图7.5
- Zookeeper使用详解之常见操作篇(zookeeper ui)
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)