import socket, logging, threading, time from struct import pack, unpack, calcsize from copy import copy from StringIO import StringIO log = logging.getLogger("wccp") WCCP2_PORT = 2048 WCCP2_HERE_I_AM = 10 WCCP2_I_SEE_YOU = 11 WCCP2_REDIRECT_ASSIGN = 12 WCCP2_REMOVAL_QUERY = 13 WCCP2_SECURITY_INFO = 0 WCCP2_NO_SECURITY = 0 WCCP2_MD5_SECURITY = 1 WCCP2_SERVICE_INFO = 1 WCCP2_SERVICE_STANDARD = 0 WCCP2_SERVICE_DYNAMIC = 1 WCCP2_ROUTER_ID_INFO = 2 WCCP2_WC_ID_INFO = 3 WCCP2_RTR_VIEW_INFO = 4 WCCP2_WC_VIEW_INFO = 5 WCCP2_REDIRECT_ASSIGNMENT = 6 WCCP2_CAPABILITY_INFO = 8 def iounpack(io, fmt): return unpack(fmt, io.read(calcsize(fmt))) class WCCPServiceMismatch(ValueError): pass class Cache(object): def __init__(self, ip, hashrev = 0, Historical = 0, Bucket = '\x00'*32, Weight = 0, Status = 0): self.ip = ip self.hashrev = hashrev self.Historical = Historical self.Bucket = Bucket self.Weight = Weight self.Status = Status def usable(self): return True class Router(object): def __init__(self, ip): self.ip = ip self.id = ip self.ReceiveID = 0 self.caches = [] self.routers = [] self.ChangeNumber = 0 self.Changed = False self.AssignmentKey = None self.Manual = False self.lastSeen = None def usable(self): try: return self.ReceiveID > 0 except AttributeError: return False class WCCPMessage(object): headerFormat = "!IHH" Registry = {} def genHeader(self, type, payload): return pack(self.headerFormat, type, 0x200, len(payload)) + payload genHeader = classmethod(genHeader) def encode(self): return self.genHeader(self.Type, "".join([ x.encode() for x in self.Components ])) def __init__(self, Service, io = None, **kwargs): self.Service = Service self.kwargs = kwargs if not io is None: self.decodeComponents(io) else: self.initComponents() def decode(self, Service, io, addr = None): header = io.read(calcsize(self.headerFormat)) type, version, length = unpack(self.headerFormat, header) return self.Registry[type](Service, StringIO(io.read(length)), addr = addr) decode = classmethod(decode) def register(self, r): for x in r: self.Registry[x.Type] = x register = classmethod(register) def decodeComponents(self, io): self.Components = [] try: while 1: self.Components.append(WCCPComponent.decode(self, self.Service, io)) except EOFError: pass class WCCPComponent(object): headerFormat = "!HH" Registry = {} def genHeader(self, type, payload): return pack(self.headerFormat, type, len(payload)) + payload genHeader = classmethod(genHeader) def encode(self): return self.genHeader(self.Type, self.payload()) def __init__(self, Service, Message = None, io = None): self.Service = Service if not Message is None: self.Message = Message if not io is None: self.decodePayload(io) def decode(self, Message, Service, io): header = io.read(calcsize(self.headerFormat)) if len(header) < calcsize(self.headerFormat): raise EOFError type, length = unpack(self.headerFormat, header) return self.Registry[type](Service, Message, StringIO(io.read(length))) decode = classmethod(decode) def register(self, r): for x in r: self.Registry[x.Type] = x register = classmethod(register) class WCCPSecurityInfoComponent(WCCPComponent): Type = WCCP2_SECURITY_INFO def payload(self): return pack("!I", WCCP2_NO_SECURITY) def decodePayload(self, io): pass class WCCPServiceInfoComponent(WCCPComponent): Type = WCCP2_SERVICE_INFO def payload(self): if not self.Service.ServiceType: return pack( "!BBBBI8h", WCCP2_SERVICE_STANDARD, # Service type self.Service.ServiceID, # Service ID 0, # Priority 0, # Protocol 0, # Service flags *[0]*8 # Ports ) else: return pack( "!BBBBI8h", WCCP2_SERVICE_DYNAMIC, # Service type self.Service.ServiceID, # Service ID self.Service.ServicePriority, # Priority self.Service.ServiceProtocol, # Protocol self.Service.ServiceFlags, # Service flags *(self.Service.ServicePorts[0:8] + [0] * (8 - len(self.Service.ServicePorts[0:8]))) # Ports ) def decodePayload(self, io): x = iounpack(io, "!BBBBI8h") ServiceType, ServiceID, Priority, Protocol, Flags = x[0:5] Ports = list(x[5:]) ServiceType = ServiceType == WCCP2_SERVICE_DYNAMIC if ServiceType != self.Service.ServiceType or ServiceID != self.Service.ServiceID: raise WCCPServiceMismatch class WCCPRouterIdentityInfoComponent(WCCPComponent): Type = WCCP2_ROUTER_ID_INFO def decodePayload(self, io): RouterID, RouterRecvID = iounpack(io, "!4sI") SendToAddress, NumRecvFrom = iounpack(io, "!4sI") caches = [] Flag = False while NumRecvFrom > 0: c, = iounpack(io, "!4s") NumRecvFrom -= 1 if c == self.Service.me.ip: Flag = True caches.append(c) if not Flag: raise WCCPServiceMismatch self.Service.UpdateRouterID(RouterID, RouterRecvID, socket.inet_aton(self.Message.kwargs['addr'][0])) self.Message.RouterID = RouterID def fWCCPWebCacheIdentityElement(x): return pack( "!4sHH32sHH", x.ip, # Web Cache IP Address 0, # Hash revision x.Historical << 14, # Historical bit + reserved x.Bucket, # Bucket blocks x.Weight, # Weigth x.Status # Status ) class WCCPWebCacheIdentityInfoComponent(WCCPComponent): Type = WCCP2_WC_ID_INFO def payload(self): return fWCCPWebCacheIdentityElement(self.Service.me) def fWCCPRouterIDElement(x): return pack("!4sI", x.id, x.ReceiveID) class WCCPRouterViewInfoComponent(WCCPComponent): Type = WCCP2_RTR_VIEW_INFO def decodePayload(self, io): changeNumber, = iounpack(io, "!I") assignmentKey = iounpack(io, "!4sI") nrouters, = iounpack(io, "!I") routers = [] while nrouters > 0: routers.append(iounpack(io, "!4s")[0]) nrouters -= 1 ncaches, = iounpack(io, "!I") caches = [] while ncaches > 0: caches.append(iounpack(io, "!4sHH32sHH")) ncaches -= 1 self.Service.UpdateRouterView(self.Message.RouterID, changeNumber, assignmentKey, routers, caches) class WCCPWebCacheViewInfoComponent(WCCPComponent): Type = WCCP2_WC_VIEW_INFO def payload(self): usableRouters = [ x for x in self.Service.routers.values() if x.usable() ] usableCaches = [ x for x in self.Service.caches.values() if x.usable() ] return (pack("!II", self.Service.ChangeNumber, len(usableRouters)) + "".join([ fWCCPRouterIDElement(x) for x in usableRouters ]) + pack("!I", len(usableCaches)) + "".join([ pack("!4s", x.ip) for x in usableCaches ])) def buildBuckets(n): r = "" for x in range(0, 256): r += pack("!B", x % n) return r class WCCPAssignmentInfoComponent(WCCPComponent): Type = WCCP2_REDIRECT_ASSIGNMENT def payload(self): usableRouters = [ x for x in self.Service.routers.values() if x.usable() ] usableCaches = [ x for x in self.Service.caches.values() if x.usable() ] return ( pack("!4sII", self.Service.me.ip, self.Service.AssignChangeNumber, len(usableRouters)) + "".join([ pack("!4sII", x.id, x.ReceiveID, x.ChangeNumber) for x in usableRouters ]) + pack("!I", len(usableCaches)) + "".join([ pack("!4s", x.ip) for x in usableCaches ]) + buildBuckets(len(usableCaches)) ) WCCPComponent.register( [ WCCPWebCacheViewInfoComponent, WCCPWebCacheIdentityInfoComponent, WCCPServiceInfoComponent, WCCPSecurityInfoComponent, WCCPRouterIdentityInfoComponent, WCCPRouterViewInfoComponent, WCCPAssignmentInfoComponent ] ) class WCCPHereIAmMessage(WCCPMessage): Type = WCCP2_HERE_I_AM def initComponents(self): self.Components = [] for x in [ WCCPSecurityInfoComponent, WCCPServiceInfoComponent, WCCPWebCacheIdentityInfoComponent, WCCPWebCacheViewInfoComponent ]: self.Components.append(x(self.Service)) class WCCPISeeYouMessage(WCCPMessage): Type = WCCP2_I_SEE_YOU class WCCPRedirectAssignMessage(WCCPMessage): Type = WCCP2_REDIRECT_ASSIGN def initComponents(self): self.Components = [] for x in [ WCCPSecurityInfoComponent, WCCPServiceInfoComponent, WCCPAssignmentInfoComponent ]: self.Components.append(x(self.Service)) WCCPMessage.register([WCCPHereIAmMessage, WCCPISeeYouMessage]) class WCCP2Service(object): def __init__(self, myip): self.me = Cache(myip) self.caches = {} self.routers = {} self.ChangeNumber = 0 self.HIATime = 0 self.ChangeTime = None self.AssignmentSent = False self.AssignChangeNumber = 0 self.Stabilized = False self.ServiceType = False self.ServiceID = 0 def setService(self, type, id, priority = 0, protocol = 0, flags = 0, ports = []): self.ServiceType = type self.ServiceID = id self.ServicePriority = priority self.ServiceProtocol = protocol self.ServiceFlags = flags self.ServicePorts = copy(ports) def HereIAm(self, router): return WCCPHereIAmMessage(self, router = router).encode() def UpdateRouterID(self, ID, ReceiveID, addr): if ID in self.routers: if not self.routers[ID].usable(): self.changed() self.routers[ID].ReceiveID = ReceiveID self.routers[ID].lastSeen = time.time() else: Flag = False for k, v in self.routers.items(): if addr == v.ip: Flag = True v.id = ID self.routers[v.id] = v del self.routers[k] break if Flag: self.routers[ID].ReceiveID = ReceiveID self.routers[ID].lastSeen = time.time() else: r = Router(ID) r.ReceiveID = ReceiveID r.lastSeen = time.time() self.routers[ID] = r self.changed() def UpdateRouterView(self, ID, changeNumber, assignmentKey, routers, caches): r = self.routers[ID] if r.ChangeNumber != changeNumber: r.Changed = True r.ChangeNumber = changeNumber r.AssignmentKey = assignmentKey r.routers = routers r.caches = caches self.ConvergeView() def ConvergeView(self): Flag = False for r in self.routers.values(): if r.Changed: cc = [] for c in r.caches: cc.append(c[0]) if not c[0] in self.caches: ccc = Cache(c[0], c[1], c[2], c[3], c[4], c[5]) self.addCache(ccc) Flag = True cccc = [] for c in self.caches: if not c in cc: cccc.append(c) for c in cccc: del self.caches[c] Flag = True if Flag: self.changed() def addRouter(self, r): r.Manual = True self.routers[r.id] = r self.changed() def SendHereIAm(self, so): for r in self.routers.values(): so.sendto(self.HereIAm(r), (socket.inet_ntoa(r.ip), WCCP2_PORT)) self.Stabilized = True self.HIATime = time.time() def CanBeDesignated(self): return self.me.ip == min(self.caches.keys()) def AssignRedirect(self, so): self.AssignChangeNumber += 1 m = WCCPRedirectAssignMessage(self).encode() for r in self.routers.values(): so.sendto(m, (socket.inet_ntoa(r.ip), WCCP2_PORT)) self.AssignmentSent = True def sweepRouters(self): c = time.time() for k, v in self.routers.items(): if (not v.lastSeen is None) and (c - v.lastSeen) > 30 and not v.Manual: del self.routers[k] def SendUpdates(self, so): self.sweepRouters() c = time.time() if (c - self.HIATime) >= 10 or not self.Stabilized: self.SendHereIAm(so) if (c - self.ChangeTime) >= 20 and not self.AssignmentSent and self.CanBeDesignated(): self.AssignRedirect(so) def changed(self): self.ChangeTime = time.time() self.ChangeNumber += 1 self.AssignmentSent = False self.Stabilized = False def addCache(self, x): self.caches[x.ip] = x class WCCPD(object): def __init__(self, so): self.so = so self.Services = [] self.lock = threading.RLock() self.tSend = threading.Thread(target = self.sendThread) self.tRecv = threading.Thread(target = self.receiveThread) self.tSend.setDaemon(True) self.tRecv.setDaemon(True) self.tSend.start() self.tRecv.start() def parseMessage(self, msg, addr): for srv in self.Services: try: WCCPMessage.decode(srv, StringIO(msg), addr) break except WCCPServiceMismatch: pass def receiveThread(self): while 1: msg, addr = self.so.recvfrom(1500) self.lock.acquire() self.parseMessage(msg, addr) self.lock.release() def sendThread(self): while 1: self.lock.acquire() for srv in self.Services: srv.SendUpdates(self.so) self.lock.release() time.sleep(1) def addService(self, srv): self.lock.acquire() self.Services.append(srv) self.lock.release() so = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) so.bind(('', WCCP2_PORT)) WW = WCCPD(so) W = WCCP2Service(socket.inet_aton('10.82.10.10')) W.addRouter(Router(socket.inet_aton('10.82.10.128'))) WW.addService(W) W = WCCP2Service(socket.inet_aton('10.82.10.10')) W.addRouter(Router(socket.inet_aton('10.82.10.128'))) W.setService(True, 0, 240, 6, 0, [ 3128, 8000, 8010 ]) WW.addService(W) while 1: eval(raw_input('>>> '))