PAXOS---最重要的分布式算法----简单模拟


 

最近由于某种原因需要学习分布式系统,其中涉及到分布式系统中的核心内容:PAXOS算法,据说此算法是分布式系统的基石,所有的分布式系统都是在此协议下进行的,是非常重要的分布式算法,用来保证系统的容错性和一致性的。


关于PAXOS的历史和传奇故事有很多,wiki上大把,摘录一段,让大家也了解一下这个协议:


分布式系统中的节点通信存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。基于消息传递通信模型的分布式系统,不可避免的会发生以下错误:进程可能会慢、垮、重启,消息可能会延迟、丢失、重复,在基础 Paxos 场景中,先不考虑可能出现消息篡改即拜占庭错误的情况。


Paxos 算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个“一致性算法”以保证每个节点看到的指令一致。一个通用的一致性算法可以应用在许多场景中,是分布式计算中的重要问题。因此从20世纪80年代起对于一致性算法的研究就没有停止过。


为描述 Paxos 算法,Lamport 虚拟了一个叫做 Paxos 的希腊城邦,这个岛按照议会民主制的政治模式制订法律,但是没有人愿意将自己的全部时间和精力放在这种事情上。所以无论是议员,议长或者传递纸条的服务员都不能承诺别人需要时一定会出现,也无法承诺批准决议或者传递消息的时间。但是这里假设没有拜占庭将军问题(Byzantine failure,即虽然有可能一个消息被传递了两次,但是绝对不会出现错误的消息);只要等待足够的时间,消息就会被传到。另外,Paxos 岛上的议员是不会反对其他议员提出的决议的。


对应于分布式系统,议员对应于各个节点,制定的法律对应于系统的状态。各个节点需要进入一个一致的状态,例如在独立Cache的对称多处理器系统中,各个处理器读内存的某个字节时,必须读到同样的一个值,否则系统就违背了一致性的要求。一致性要求对应于法律条文只能有一个版本。议员和服务员的不确定性对应于节点和消息传递通道的不可靠性。


其中提到的Lamport是个计算机大牛,有兴趣的可以自行google


 


按这个描述,可以比较清楚的搞清楚该算法的含义和精髓,当然,可以扩展阅读该作者更多的文章。哈哈


模拟算法中分为三个部分
Leader,这是个辅助线程,用来分发议案的编号等信息
Proposer,这个是议案的提出者,负责提出议案的
Acceptor,这个是议案的响应者,负责表决通过议案的
模拟程序中没有learner,也就是学习者,这个不是必要的
proposer和acceptor实际上是可以兼职的,但是为了简单,我们把他们分开来,各司其职

 


对于一个proposer,他要做的工作其实可以简单描述如下:


1.从leader那里拿到议案的编号和议案的内容
2.发送议案到不少于acceptor总数一半的响应者中,并等待响应者的回应
3.在一个给定的超时时间内,尽可能的收集响应者的回应
4.分析回应
    1)如果全部回应是chosen,直接通过议案,结束

    2)如果有回应是reject,直接停止议案,结束
    3)如果是其他情况,根据回应的议案内容重新生成议案,但是议案的编号不变,再发给所有人。
    4)如果有人的回应由于总总原因没有收到,就再选择一个新的回应者,发送议案
5)回到2继续发送议案,但是此时议案在4的过程中可能已经换了,但是议案的编号并没有变

 


而对于一个acceptor,工作更为简单


1.首先保存一个数据 (s,v,sh) 其中s是上次通过的决议编号,v是通过的决议编号的内容,sh是承诺的最小决议编号,小于这个数的编号直接拒绝。
2.接收proposer过来的数据
3.分析数据:
    1) 如果(s,v,sh)都是0,将接收的数据填入到该数据结构中
    2)如果接收的数据编号小于sh,直接拒绝
    3)如果接收的数据编号大于sh,并且s不为0,通过议案,并要告诉对方你曾经同意过 s,v 的议案
    4)如果接收的数据编号等于sh,直接通过
4跳转到2

 


上面就是该算法的简单描述,当然,在实际中还有很多其他问题需要挨个讨论,目前,我们只是模拟一下这个算法,思想上正确就行了。代码也挺简单,贴一些,全部的代码github上自取,用处不大,只是简单的模拟一下


proposer处理报文
[python] if(self.start_propose==True and time.time()-self.time_start > 5): 
                    printStr(self.name +"的本轮决议"+self.value+"投票结束,同意:"+str(self.accept)+"拒绝:"+str(self.reject) + "选择:"+str(self.chosen)) 
                    self.start_propose=False 
                    if(self.reject>0): 
                        printStr(self.name+"的决议"+self.value+"被否决,停止提议,退出") 
                    if(self.chosen==len(self.acceptors)): 
                        printStr(" "+self.name+"的决议"+self.value+"被同意,完成决议过程 ") 
                    if (self.accept>0 or 
                       (self.chosen<len(self.acceptors) and self.chosen>0 and self.reject==0) or 
                       (self.accept==0 and self.chosen==0 and self.reject==0)): 
                        self.reject=0 
                        self.chosen=0 
                        self.accept=0 
                        self.sendPropose()  

if(self.start_propose==True and time.time()-self.time_start > 5):
                    printStr(self.name +"的本轮决议"+self.value+"投票结束,同意:"+str(self.accept)+"拒绝:"+str(self.reject) + "选择:"+str(self.chosen))
                    self.start_propose=False
                    if(self.reject>0):
                        printStr(self.name+"的决议"+self.value+"被否决,停止提议,退出")
                    if(self.chosen==len(self.acceptors)):
                        printStr(" "+self.name+"的决议"+self.value+"被同意,完成决议过程 ")
                    if (self.accept>0 or
                       (self.chosen<len(self.acceptors) and self.chosen>0 and self.reject==0) or
                       (self.accept==0 and self.chosen==0 and self.reject==0)):
                        self.reject=0
                        self.chosen=0
                        self.accept=0
                        self.sendPropose()

 


acceptor处理报文片段
    [python] ###############################################  
    #  
    #处理议案提出者提出的决议  
    #  
    ###############################################  
    def processPropose(self,value): 
        res={} 
        #如果从来没接收过议案,跟新自身议案  
        if(0==self.values["max"] and 0==self.values["last"]): 
            self.values["max"]=value["Vnum"] 
            self.values["last"]=value["Vnum"] 
            self.values["value"]=value["Value"] 
            res={ 
                 "type":"accpting", 
                 "result":"accept", 
                 "last":0, 
                 "value":self.values["value"], 
                 "accpetor":self.num, 
                 "time":value["time"]} 
        else: 
            #如果收到的议案大于承诺最低表决的议案,同意并告知之前表决结果  
            if(self.values["max"] < value["Vnum"]): 
                self.values["max"]=value["Vnum"] 
                res={ 
                    "type":"accpting", 
                    "result":"accept", 
                     "last":self.values["last"], 
                     "value":self.values["value"], 
                     "accpetor":self.num , 
                     "time":value["time"]} 
            else: 
                #如果收到的议案等于承诺最低表决的议案,完全同意议案,表决结束  
                if(self.values["max"] == value["Vnum"]): 
                     
                    self.values["last"]=value["Vnum"] 
                    self.values["value"]=value["Value"] 
                    res={ 
                        "type":"accpting", 
                        "result":"chosen", 
                        "last":self.values["last"], 
                        "value":self.values["value"], 
                        "accpetor":self.num, 
                        "time":value["time"] 
                     } 
                else: 
                    #如果收到的议案小于承诺最低表决的议案,直接拒绝  
                    res={ 
                        "type":"accpting", 
                        "result":"reject", 
                        "last":self.values["last"], 
                        "value":self.values["value"], 
                        "accpetor":self.num, 
                        "time":value["time"] 
                     } 
        return res 

###############################################
    #
    #处理议案提出者提出的决议
    #
    ###############################################
    def processPropose(self,value):
        res={}
        #如果从来没接收过议案,跟新自身议案
        if(0==self.values["max"] and 0==self.values["last"]):
            self.values["max"]=value["Vnum"]
            self.values["last"]=value["Vnum"]
            self.values["value"]=value["Value"]
            res={
                 "type":"accpting",
                 "result":"accept",
                 "last":0,
                 "value":self.values["value"],
                 "accpetor":self.num,
                 "time":value["time"]}
        else:
            #如果收到的议案大于承诺最低表决的议案,同意并告知之前表决结果
            if(self.values["max"] < value["Vnum"]):
                self.values["max"]=value["Vnum"]
                res={
                    "type":"accpting",
                    "result":"accept",
                     "last":self.values["last"],
                     "value":self.values["value"],
                     "accpetor":self.num ,
                     "time":value["time"]}
            else:
                #如果收到的议案等于承诺最低表决的议案,完全同意议案,表决结束
                if(self.values["max"] == value["Vnum"]):
                   
                    self.values["last"]=value["Vnum"]
                    self.values["value"]=value["Value"]
                    res={
                        "type":"accpting",
                        "result":"chosen",
                        "last":self.values["last"],
                        "value":self.values["value"],
                        "accpetor":self.num,
                        "time":value["time"]
                     }
                else:
                    #如果收到的议案小于承诺最低表决的议案,直接拒绝
                    res={
                        "type":"accpting",
                        "result":"reject",
                        "last":self.values["last"],
                        "value":self.values["value"],
                        "accpetor":self.num,
                        "time":value["time"]
                     }
        return res

 

相关内容

    暂无相关文章

评论关闭