如何查看zookeeper 查看节点是否启动

zookeeper源码学习 - @用小弟弟思考的程序员 - ITeye技术网站
博客分类:
上周在同事的唆使下,我抽空把zookeeper的实现研究了下,把分享帖给大家:
1. paxos算法介绍
如何达成一致性是分布式系统上的一个经典问题,而paxos就是用来解决这个问题的。
对于该算法的介绍,可以先读一下此文:
http://zh.wikipedia.org/wiki/Paxos%E7%AE%97%E6%B3%95
说白了,就是针对于一个提案,只要半数以上成员达成一致,那么提案就被通过了。若是有部分成员记性差,忘记了上次的提案,只要仍旧有半数以上的成员还记得,就可以把之前的一致告诉他。
在经典的paxos算法中,有如下几个成员:
1. proposers:提议发起者
2. acceptors:提案处理者
3. learners:提案学习者(只能学习被通过的提案)
提案的处理分成2个阶段:
1. prepare 阶段:
proposer 选择一个提案编号 n 并将 prepare 请求发送给 acceptors 中的一个多数派;
acceptor 收到 prepare 消息后,如果提案的编号大于它已经回复的所有 prepare 消息,则 acceptor 将自己上次接受的提案回复给 proposer,并承诺不再回复小于 n 的提案;
2. 批准阶段:
当一个 proposor 收到了多数 acceptors 对 prepare 的回复后,就进入批准阶段。它要向回复 prepare 请求的 acceptors 发送 accept 请求,包括编号 n 和之前已经accept正在处理中的提案。
在不违背自己向其他 proposer 的承诺的前提下,acceptor 收到 accept 请求后即接受这个请求。
这个过程在任何时候中断都可以保证正确性。例如如果一个 proposer 发现已经有其他 proposers 提出了编号更高的提案,则有必要中断这个过程。因此为了优化,在上述 prepare 过程中,如果一个 acceptor 发现存在一个更高编号的提案,则需要通知 proposer,提醒其中断这次提案。
流程图如下:
2.Zookeeper介绍
Zookeeper是paxos算法的实现,它的默认实现(FastLeaderElection)对经典paxos的2个阶段处理进行了改进,改为了一个阶段。
处理流程图如下:
这里需要对几个角色做下解释:
1. proposer:提案发起者
2. Leader:一致达成之后的领导者,当有人忘记之前提案的时候,可以和它进行同步。
3. Follower:可以发起投票的参与者。
4. Observer:只能接受数据,但不能发起投票。
接下来,我们看下zookeeper的启动流程:
启动流程:
QuorumPeerMain-&initializeAndRun
&&&&&&&&&&&&&&&&&&&&&&&&&&& |_NIOServerCnxn.Factory
&&&&&&&&&&&&&&&&&&&&&&&&&&& |_quorumPeer
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& |_start
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& |_zkDb.loadDataBase()
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& |_cnxnFactory.start()-&doIO
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& |_startLeaderElection-&createElectionAlgorithm-&listener.start
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& |_super.start
Zookeeper有2种工作模式,standalone模式和集群模式,当配置的server小于等于1台则为单机模式,这里我们讲的是集群模式。
quorumPeer为集群中每台机器还未确定角色的名称。
启动步骤为:
1. 从本地加载数据。
2. 启动cnxnFactory用于监听客户端和follower的请求。
3. startLeaderElection启动选举算法,默认为FastLeaderElection
4. 启动选举结果监听器,用于达成一致性。
5. quorumPeer线程主循环启动,在选举成功后,进入自己的角色。还未投票前为LOOKING
成功后为:LEADING,OBSERVING,或FOLLOWING
假如为leading,则初始化leader,过程为:
Leader.lead()-&LearnerCnxAcceptor-&LearnerHandler
LearnerHandler这个线程处理所有Learner(包括Follower和Observer)的交互逻辑。从Learner发来的消息有以下几种:
1.ACK消息。这是Follower对PROPOSAL消息的响应。Leader收到这个消息后,判断对应的PROPOSAL如果有过半的voter通过,则发送commit请求到CommitProcessor线程的CommittedRequest队列,并且发送Commit消息给所有Follower,发送INFORM消息给所有Observer(告诉这个Proposal通过了)。
2. REQUEST消息。这是Follower转发来的写请求,或者同步请求。转交给PrepRequestProcessor线程处理(放入其submittedRequests队列)
3. PING消息。Learner的心跳消息
4. REVALIDATE消息。用来延长session有效时间
另外,LeaderZooKeeperServer也会在lead函数中被初始化,并setupRequestProcessors
这样,所有进入leader的请求都会走流程:
PrepRequestProcessor-&ProposalRequestProcessor-&CommitProcessor-&ToBeAppliedRequestProcessor-&FinalRequestProcessor
假如是读请求,那么PrepRequestProcessor会直接响应用户。
假如是写请求,那么PrepRequestProcessor会向Follower发送PROPOSAL请求。
然后,把request放入到CommitProcessor的queuedRequests,当committedRequests中收到LearnerHandler接受到Follower对PROPOSAL消息的响应消息后。则CommitProcessor处理该request,并进入后续流程。
大致过程如上所述,如有疑问,有空一起交流下。
例如如果一个 proposer 发现已经有其他 proposers 提出了编号更高的提案,则有必要中断这个过程为什么要打断呢,打断不是造成混乱吗?b.图1中的response 由learner返回觉得不对,而图2中的才是正确的.c.2中说是改为一阶段,其实本质是还是2个阶段,只是leader封装了吧.sincerely
lingqi1818
浏览: 138167 次
来自: 杭州
有几点要请教下;a.在二阶段里有这样一句:引用例如如果一个 p ...
不错,我也遇到了第一个问题
xiaoych 写道很好,研究了一年多了吧,哈哈
难得你上i ...
很好,研究了一年多了吧,哈哈
pengpeng 写道很强大。我觉得mas-slave那块可以 ... 上传我的文档
 下载
 收藏
该文档贡献者很忙,什么也没留下。
 下载此文档
正在努力加载中...
解决zookeeper linux下无法启动的问题
下载积分:30
内容提示:解决zookeeper linux下无法启动的问题
文档格式:PDF|
浏览次数:137|
上传日期: 05:40:57|
文档星级:
该用户还上传了这些文档
解决zookeeper linux下无法启动的问题
官方公共微信深入浅出Zookeeper之五
Leader选举 - 吊丝码农 - ITeye技术网站
前面几篇文章简单介绍了zookeeper的单机server client处理。接下来几篇文章会介绍分布式部署下zookeeper的实现原理。我们假设有3台server的集群,zoo.cfg配置如下
tickTime=2000
dataDir=/home/admin/zk-data
clientPort=2181
#Learner初始化连接到Leader的超时时间
initLimit=10
#Learner和Leader之间消息发送,响应的超时时间
syncLimit=5
#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口
server.1=master:
server.2=slave1:
server.3=slave2:
在server.1的$dataDir下
echo '1'&myid
启动server.1
./zkServer.sh start
分析之前先看看选举相关的类图
入口函数QuorumPeerMain主线程启动
public void runFromConfig(QuorumPeerConfig config) throws IOException {
("Starting quorum peer");
//对client提供读写的server,一般是2181端口
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
//zk的逻辑主线程,负责选举,投票等
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
//集群机器地址
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
//本机的集群编号
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
//投票决定方式,默认超过半数就通过
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
//启动主线程
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
QuorumPeer复写Thread.start方法,启动
public synchronized void start() {
//恢复DB,从zxid中回复epoch变量,代表投票轮数
loadDataBase();
//启动针对client的IO线程
cnxnFactory.start();
//选举初始化,主要是从配置获取选举类型
startLeaderElection();
super.start();
loadDataBase过程,恢复epoch数
private void loadDataBase() {
//从本地文件恢复db
zkDb.loadDataBase();
// load the epochs
//从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZ
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZ
(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
if (epochOfZxid & currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
选举初始化
synchronized public void startLeaderElection() {
//先投自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
//从配置中拿自己的选举地址
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.
//根据配置,获取选举算法
this.electionAlg = createElectionAlgorithm(electionType);
获取选举算法,默认为FastLeaderElection算法
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
le = new LeaderElection(this);
le = new AuthFastLeaderElection(this);
le = new AuthFastLeaderElection(this, true);
//leader选举IO负责类
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.
//启动已绑定3888端口的选举线程,等待集群其他机器连接
if(listener != null){
listener.start();
//基于TCP的选举算法
le = new FastLeaderElection(this, qcm);
LOG.error("Null listener when initializing cnx manager");
FastLeaderElection初始化
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self =
proposedLeader = -1;
proposedZxid = -1;
//业务层发送队列,业务对象ToSend
sendqueue = new LinkedBlockingQueue&ToSend&();
//业务层接受队列,业务对象Notificataion
recvqueue = new LinkedBlockingQueue&Notification&();
this.messenger = new Messenger(manager);
Messenger(QuorumCnxManager manager) {
//启动业务层发送线程,将消息发给IO负责类QuorumCnxManager
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
//启动业务层接受线程,从IO负责类QuorumCnxManager接受消息
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
QuorumPeer线程启动
* Main loop
while (running) {
switch (getPeerState()) {
//如果状态是LOOKING,则进入选举流程
case LOOKING:
("LOOKING");
//选举算法开始选举,主线程可能在这里耗比较长时间
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
//其他流程处理
case OBSERVING:
("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
case FOLLOWING:
("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
case LEADING:
("LEADING");
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
setPeerState(ServerState.LOOKING);
进入选举流程
public Vote lookForLeader() throws InterruptedException {
//收到的投票
HashMap&Long, Vote& recvset = new HashMap&Long, Vote&();
HashMap&Long, Vote& outofelection = new HashMap&Long, Vote&();
int notTimeout = finalizeW
synchronized(this){
logicalclock++;
//先投给自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
("New election. My id =
" + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
//发送投票,包括发给自己
sendNotifications();
* Loop in which we exchange notifications until we find a leader
//主循环,直到选出leader
while ((self.getPeerState() == ServerState.LOOKING) &&
* Remove next notification from queue, times out after 2 times
* the termination time
//从IO线程里拿到投票消息,自己的投票也在这里处理
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
//如果空闲
if(n == null){
//消息发完了,继续发送,一直到选出leader为止
if(manager.haveDelivered()){
sendNotifications();
//消息还在,可能其他server还没启动,尝试连接
manager.connectAll();
* Exponential backoff
//延长超时时间
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut & maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
("Notification time out: " + notTimeout);
//收到了投票消息
else if(self.getVotingView().containsKey(n.sid)) {
* Only proceed if the vote comes from a replica in the
* voting view.
switch (n.state) {
//LOOKING消息,则
case LOOKING:
//检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//胜出了,就把自己的投票修改为对方的,然后广播消息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
//添加到本机投票集合,用来做选举终结判断
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//选举是否结束,默认算法是超过半数server同意
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
//修改状态,LEADING or FOLLOWING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//返回最终的选票结果
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endV
//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时
//OBSERVING机器不参数选举
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
//这2种需要参与选举
case FOLLOWING:
case LEADING:
* Consider all notifications from the same epoch
* together.
if(n.electionEpoch == logicalclock){
//同样需要加入到本机的投票集合
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//投票是否结束,如果结束,再确认LEADER是否有效
//如果结束,修改自己的状态并返回投票结果
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endV
* Before joining an established ensemble, verify that
* a majority are following the same leader.
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch, n.state));
选举消息发送
private void sendNotifications() {
//循环发送
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.
//消息实体
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
proposedEpoch);
//添加到业务的发送队列,该队列会被WorkerSender消费
sendqueue.offer(notmsg);
WorkerSender消费
public void run() {
while (!stop) {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null)
process(m);
} catch (InterruptedException e) {
("WorkerSender is down");
private void process(ToSend m) {
//选票协议是固定的
byte requestBytes[] = new byte[36];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
* Building notification packet to send
requestBuffer.clear();
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(m.leader);
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.electionEpoch);
requestBuffer.putLong(m.peerEpoch);
//通过QuorumCnxManager这个IO负责类发送消息
manager.toSend(m.sid, requestBuffer);
QuorumCnxManager具体发送
public void toSend(Long sid, ByteBuffer b) {
* If sending message to myself, then simply enqueue it (loopback).
//如果是自己,不走网络,直接添加到本地接受队列
if (self.getId() == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
* Otherwise send to the corresponding thread to send.
//否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程
* Start a new connection if doesn't have one already.
if (!queueSendMap.containsKey(sid)) {
ArrayBlockingQueue&ByteBuffer& bq = new ArrayBlockingQueue&ByteBuffer&(
SEND_CAPACITY);
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);
ArrayBlockingQueue&ByteBuffer& bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
LOG.error("No queue for server " + sid);
connectOne(sid);
尝试连接过程
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) == null){
//对方的选举地址,3888端口
electionAddr = self.quorumPeers.get(sid).electionA
//同步IO连接
Socket sock = new Socket();
setSockOpts(sock);
sock.connect(self.getView().get(sid).electionAddr, cnxTO);
if (LOG.isDebugEnabled()) {
LOG.debug("Connected to server " + sid);
//连上了,初始化IO线程
initiateConnection(sock, sid);
由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。
这个时候被连接的server的Listener选举线程会收到新连接
Listener主循环,接受连接
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
("Received connection request "
+ client.getRemoteSocketAddress());
receiveConnection(client);
numRetries = 0;
新连接处理
public boolean receiveConnection(Socket sock) {
Long sid =
// Read server id
//读server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
//If wins the challenge, then close the new connection.
//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server
if (sid & self.getId()) {
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
* Now we start a new connection
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
//如果对方id比我大,允许连接,并初始化单独的IO线程
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue&ByteBuffer&(
SEND_CAPACITY));
sw.start();
rw.start();
连上后,自己server的IO线程初始化
public boolean initiateConnection(Socket sock, Long sid) {
DataOutputStream dout =
// Sending id and challenge
//先发一个server id
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(self.getId());
dout.flush();
// If lost the challenge, then drop the new connection
//如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费
if (sid & self.getId()) {
("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
//如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue&ByteBuffer&(
SEND_CAPACITY));
sw.start();
rw.start();
通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。
IO发送线程SendWorker启动,开始发送选举消息
while (running && !shutdown && sock != null) {
ByteBuffer b =
//每个server一个发送队列
ArrayBlockingQueue&ByteBuffer& bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
LOG.error("No queue of incoming messages for " +
"server " + sid);
if(b != null){
lastMessageSent.put(sid, b);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
this.finish();
这个时候,其他机器通过IO线程RecvWorker收到消息
public void run() {
threadCnt.incrementAndGet();
while (running && !shutdown && sock != null) {
* Reads the first int to determine the length of the
//包的长度
int length = din.readInt();
if (length &= 0 || length & PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
* Allocates a new ByteBuffer to receive the message
//读到内存
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
//添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息
addToRecvQueue(new Message(message.duplicate(), sid));
业务层的接受线程WorkerReceiver拿消息
public void run() {
while (!stop) {
// Sleeps on receive
//从IO线程拿数据
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null)
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
//如果是Observer,则返回当前选举结果
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
// Receive new message
// State of peer that sent this message
//对方节点状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
ackstate = QuorumPeer.ServerState.LOOKING;
ackstate = QuorumPeer.ServerState.FOLLOWING;
ackstate = QuorumPeer.ServerState.LEADING;
ackstate = QuorumPeer.ServerState.OBSERVING;
// Instantiate Notification and set its attributes
//初始化Notification对象
Notification n = new Notification();
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.sid = response.
* If this server is looking, then send proposed leader
//如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
//如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id =
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。
Leader选举小结
1.server启动时默认选举自己,并向整个集群广播
2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
5.Obserer机器不参与选举
浏览 18488
浏览: 164521 次
来自: 杭州
我看的3.4.6版本中takeSnapshot();
这个方 ...
hi,请教个问题,在你的博文中提到下面一句话。我不理解怎么让同 ...
//REQUEST包,事务请求,follower会将事务请求转 ...
如此好文竟然没人顶,赞一个!最近也在学习zookeeper,希 ...
我想知道淘宝HSF使用OSGi集成进去 目的是什么???有哪些 ...}

我要回帖

更多关于 zookeeper 日志查看 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信