From cc683659778246637197da7ca8244cb3f5968232 Mon Sep 17 00:00:00 2001 From: Thorsten Sommer Date: Mon, 13 Jul 2015 10:44:03 +0200 Subject: [PATCH] Implemented the fixed ICCC driver specification --- ICCC/ICCCGetHostsReceiver.go | 2 ++ ICCC/ICCCGetListenersReceiver.go | 2 ++ ICCC/ICCCGetVersionReceiver.go | 2 +- ICCC/ICCCRegisterHostReceiver.go | 2 +- ICCC/ICCCRegisterListenerReceiver.go | 2 +- ICCC/InitDB.go | 2 ++ ICCC/RegisterHost2Database.go | 3 ++- ICCC/RegisterListener2Database.go | 3 ++- ICCC/RegisterLocalListener2Database.go | 2 +- ICCC/RegisterThisHost2Database.go | 2 +- ICCC/Scheme/Host.go | 1 + ICCC/Scheme/Listener.go | 1 + ICCC/SystemMessages/ICCCGetHosts.go | 1 + ICCC/SystemMessages/ICCCGetListeners.go | 1 + ICCC/SystemMessages/ICCCGetVersion.go | 2 +- ICCC/SystemMessages/ICCCRegisterHost.go | 1 + ICCC/SystemMessages/ICCCRegisterListener.go | 1 + ICCC/Variables.go | 3 +++ ICCC/WriteMessage2All.go | 14 ++++++++++---- ICCC/WriteMessage2Any.go | 17 ++++++++++++----- System/Version/Variables.go | 2 +- WebServer/Start.go | 2 +- 22 files changed, 49 insertions(+), 19 deletions(-) diff --git a/ICCC/ICCCGetHostsReceiver.go b/ICCC/ICCCGetHostsReceiver.go index cbaab11..77edfc4 100644 --- a/ICCC/ICCCGetHostsReceiver.go +++ b/ICCC/ICCCGetHostsReceiver.go @@ -35,6 +35,7 @@ func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string) answerMessage := SystemMessages.ICCCGetHostsAnswer{} answerMessage.Hostnames = make([]string, countHosts, countHosts) answerMessage.IPAddressesPorts = make([]string, countHosts, countHosts) + answerMessage.Kinds = make([]byte, countHosts, countHosts) // Loop over all hosts which are currently available at the cache: n := 0 @@ -42,6 +43,7 @@ func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string) host := entry.Value.(Scheme.Host) answerMessage.Hostnames[n] = host.Hostname answerMessage.IPAddressesPorts[n] = host.IPAddressPort + answerMessage.Kinds[n] = host.Kind n++ } diff --git a/ICCC/ICCCGetListenersReceiver.go b/ICCC/ICCCGetListenersReceiver.go index c93e618..97b3b89 100644 --- a/ICCC/ICCCGetListenersReceiver.go +++ b/ICCC/ICCCGetListenersReceiver.go @@ -36,6 +36,7 @@ func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]str answerMessage.Channels = make([]string, countListeners, countListeners) answerMessage.Commands = make([]string, countListeners, countListeners) answerMessage.IPAddressesPorts = make([]string, countListeners, countListeners) + answerMessage.Kinds = make([]byte, countListeners, countListeners) // Loop over all hosts which are currently available at the cache: n := 0 @@ -44,6 +45,7 @@ func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]str answerMessage.Channels[n] = listener.Channel answerMessage.Commands[n] = listener.Command answerMessage.IPAddressesPorts[n] = listener.IPAddressPort + answerMessage.Kinds[n] = listener.Kind n++ } diff --git a/ICCC/ICCCGetVersionReceiver.go b/ICCC/ICCCGetVersionReceiver.go index dff9f7f..9ec82b9 100644 --- a/ICCC/ICCCGetVersionReceiver.go +++ b/ICCC/ICCCGetVersionReceiver.go @@ -28,7 +28,7 @@ func ICCCGetVersionReceiver(data map[string][]string) (result map[string][]strin // Prepare the answer: answer := SystemMessages.ICCCGetVersionAnswer{} - answer.Kind = `Ocean` + answer.Kind = KindOCEAN answer.Name = Tools.ThisHostname() answer.Version = Version.GetVersion() diff --git a/ICCC/ICCCRegisterHostReceiver.go b/ICCC/ICCCRegisterHostReceiver.go index 0a99c87..8d61a46 100644 --- a/ICCC/ICCCRegisterHostReceiver.go +++ b/ICCC/ICCCRegisterHostReceiver.go @@ -31,7 +31,7 @@ func ICCCRegisterHostReceiver(data map[string][]string) (result map[string][]str Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Register another host.`, fmt.Sprintf("hostname=%s", messageData.Hostname), fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort)) // Execute the command: - registerHost2Database(messageData.Hostname, messageData.IPAddressPort) + registerHost2Database(messageData.Hostname, messageData.IPAddressPort, messageData.Kind) // Update the caches: InitCacheNow() diff --git a/ICCC/ICCCRegisterListenerReceiver.go b/ICCC/ICCCRegisterListenerReceiver.go index fcc87ec..9f1aef5 100644 --- a/ICCC/ICCCRegisterListenerReceiver.go +++ b/ICCC/ICCCRegisterListenerReceiver.go @@ -31,7 +31,7 @@ func ICCCRegisterListenerReceiver(data map[string][]string) (result map[string][ Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another listener.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive)) // Execute the command: - registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive) + registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive, messageData.Kind) // Update the caches: InitCacheNow() diff --git a/ICCC/InitDB.go b/ICCC/InitDB.go index f6c716c..d1ea63d 100644 --- a/ICCC/InitDB.go +++ b/ICCC/InitDB.go @@ -28,6 +28,7 @@ func initDB() { // // Take care about the indexes for ICCCListener: // + collectionListener.EnsureIndexKey(`Kind`) collectionListener.EnsureIndexKey(`Command`) collectionListener.EnsureIndexKey(`Command`, `IsActive`) @@ -52,6 +53,7 @@ func initDB() { // // Index for hosts: // + collectionHosts.EnsureIndexKey(`Kind`) collectionHosts.EnsureIndexKey(`Hostname`) collectionHosts.EnsureIndexKey(`IPAddressPort`) diff --git a/ICCC/RegisterHost2Database.go b/ICCC/RegisterHost2Database.go index d8cd2d8..ef0884e 100644 --- a/ICCC/RegisterHost2Database.go +++ b/ICCC/RegisterHost2Database.go @@ -8,12 +8,13 @@ import ( ) // Function to register a server to the ICCC. -func registerHost2Database(hostname, ipAddressPort string) { +func registerHost2Database(hostname, ipAddressPort string, kind byte) { // Create the host entry: host := Scheme.Host{} host.Hostname = hostname host.IPAddressPort = ipAddressPort + host.Kind = kind // The query to find already existing entries: selection := bson.D{{`Hostname`, host.Hostname}, {`IPAddressPort`, host.IPAddressPort}} diff --git a/ICCC/RegisterListener2Database.go b/ICCC/RegisterListener2Database.go index 07e170a..09394e7 100644 --- a/ICCC/RegisterListener2Database.go +++ b/ICCC/RegisterListener2Database.go @@ -9,7 +9,7 @@ import ( ) // The internal function to register an listener to ICCC. -func registerListener2Database(channel, command, ipAddressPort string, isActive bool) { +func registerListener2Database(channel, command, ipAddressPort string, isActive bool, kind byte) { Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive)) entry := Scheme.Listener{} @@ -17,6 +17,7 @@ func registerListener2Database(channel, command, ipAddressPort string, isActive entry.Command = command entry.IsActive = isActive entry.IPAddressPort = ipAddressPort + entry.Kind = kind // // Case: Exists? diff --git a/ICCC/RegisterLocalListener2Database.go b/ICCC/RegisterLocalListener2Database.go index b91aa7c..b6211f3 100644 --- a/ICCC/RegisterLocalListener2Database.go +++ b/ICCC/RegisterLocalListener2Database.go @@ -8,5 +8,5 @@ func registerLocalListener2Database(channel, command string) { there would be no server which can execute the ICCC command. Therefore, every Ocean server registers the own listeners directly. */ - registerListener2Database(channel, command, correctAddressWithPort, true) + registerListener2Database(channel, command, correctAddressWithPort, true, KindOCEAN) } diff --git a/ICCC/RegisterThisHost2Database.go b/ICCC/RegisterThisHost2Database.go index 841c26c..f2a0c95 100644 --- a/ICCC/RegisterThisHost2Database.go +++ b/ICCC/RegisterThisHost2Database.go @@ -12,5 +12,5 @@ func registerThisHost2Database() { there would be no server which can execute the ICCC command. Therefore, every Ocean server registers the own host directly. */ - registerHost2Database(Tools.ThisHostname(), correctAddressWithPort) + registerHost2Database(Tools.ThisHostname(), correctAddressWithPort, KindOCEAN) } diff --git a/ICCC/Scheme/Host.go b/ICCC/Scheme/Host.go index 2d11418..e7def96 100644 --- a/ICCC/Scheme/Host.go +++ b/ICCC/Scheme/Host.go @@ -4,4 +4,5 @@ package Scheme type Host struct { Hostname string `bson:"Hostname"` IPAddressPort string `bson:"IPAddressPort"` + Kind byte `bson:"Kind"` } diff --git a/ICCC/Scheme/Listener.go b/ICCC/Scheme/Listener.go index 794ce7b..73d93be 100644 --- a/ICCC/Scheme/Listener.go +++ b/ICCC/Scheme/Listener.go @@ -6,4 +6,5 @@ type Listener struct { Command string `bson:"Command"` IsActive bool `bson:"IsActive"` IPAddressPort string `bson:"IPAddressPort"` + Kind byte `bson:"Kind"` } diff --git a/ICCC/SystemMessages/ICCCGetHosts.go b/ICCC/SystemMessages/ICCCGetHosts.go index f3a696c..d40e36b 100644 --- a/ICCC/SystemMessages/ICCCGetHosts.go +++ b/ICCC/SystemMessages/ICCCGetHosts.go @@ -8,4 +8,5 @@ type ICCCGetHosts struct { type ICCCGetHostsAnswer struct { Hostnames []string IPAddressesPorts []string + Kinds []byte } diff --git a/ICCC/SystemMessages/ICCCGetListeners.go b/ICCC/SystemMessages/ICCCGetListeners.go index 4a509e5..3c378af 100644 --- a/ICCC/SystemMessages/ICCCGetListeners.go +++ b/ICCC/SystemMessages/ICCCGetListeners.go @@ -9,4 +9,5 @@ type ICCCGetListenersAnswer struct { Channels []string Commands []string IPAddressesPorts []string + Kinds []byte } diff --git a/ICCC/SystemMessages/ICCCGetVersion.go b/ICCC/SystemMessages/ICCCGetVersion.go index bb5e048..2ead29c 100644 --- a/ICCC/SystemMessages/ICCCGetVersion.go +++ b/ICCC/SystemMessages/ICCCGetVersion.go @@ -6,7 +6,7 @@ type ICCCGetVersion struct { // Answer to the version request type ICCCGetVersionAnswer struct { - Kind string // Ocean || Component + Kind byte // Ocean || Component Name string // Ocean: Hostname; Components: Name Version string // The current version } diff --git a/ICCC/SystemMessages/ICCCRegisterHost.go b/ICCC/SystemMessages/ICCCRegisterHost.go index 0ba54ba..c7a20a4 100644 --- a/ICCC/SystemMessages/ICCCRegisterHost.go +++ b/ICCC/SystemMessages/ICCCRegisterHost.go @@ -4,4 +4,5 @@ package SystemMessages type ICCCRegisterHost struct { Hostname string // The hostname for the end-point IPAddressPort string // The IP address and port for the end-point + Kind byte // Ocean || Component } diff --git a/ICCC/SystemMessages/ICCCRegisterListener.go b/ICCC/SystemMessages/ICCCRegisterListener.go index 0f9e36f..9c4fe58 100644 --- a/ICCC/SystemMessages/ICCCRegisterListener.go +++ b/ICCC/SystemMessages/ICCCRegisterListener.go @@ -6,4 +6,5 @@ type ICCCRegisterListener struct { Command string // The provided command IsActive bool // Is the command active? IPAddressPort string // The IP address and port for the end-point + Kind byte // Ocean || Component } diff --git a/ICCC/Variables.go b/ICCC/Variables.go index c6539db..26c1e34 100644 --- a/ICCC/Variables.go +++ b/ICCC/Variables.go @@ -16,6 +16,9 @@ const ( ChannelICCC string = `System::ICCC` // A common ICCC channel. ChannelPING string = `System::Ping` // A channel for pings. ChannelLOGGING string = `System::Logging` // A channel for send log events to the logging system + KindOCEAN byte = 0x0 + KindComponent byte = 0x1 + KindALL byte = 0xFF ) var ( diff --git a/ICCC/WriteMessage2All.go b/ICCC/WriteMessage2All.go index c92733a..78a9e1a 100644 --- a/ICCC/WriteMessage2All.go +++ b/ICCC/WriteMessage2All.go @@ -7,7 +7,7 @@ import ( ) // Function to broadcast a message to all listeners. -func WriteMessage2All(channel, command string, message interface{}, answerPrototype interface{}) (results []interface{}) { +func WriteMessage2All(channel, command string, kind byte, message interface{}, answerPrototype interface{}) (results []interface{}) { cacheListenerDatabaseLock.RLock() defer cacheListenerDatabaseLock.RUnlock() @@ -21,9 +21,15 @@ func WriteMessage2All(channel, command string, message interface{}, answerProtot for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { listener := entry.Value.(Scheme.Listener) - // If the channel and the command matches, deliver the message: - if listener.Channel == channel && listener.Command == command { - matchingListener = append(matchingListener, listener) + // If the channel, command and kind matches, deliver the message: + if kind == KindALL { + if listener.Channel == channel && listener.Command == command { + matchingListener = append(matchingListener, listener) + } + } else { + if listener.Channel == channel && listener.Command == command && listener.Kind == kind { + matchingListener = append(matchingListener, listener) + } } } diff --git a/ICCC/WriteMessage2Any.go b/ICCC/WriteMessage2Any.go index 5b7aa82..a0b2384 100644 --- a/ICCC/WriteMessage2Any.go +++ b/ICCC/WriteMessage2Any.go @@ -8,7 +8,7 @@ import ( ) // Function to write a message to any listener. -func WriteMessage2Any(channel, command string, message interface{}, answerPrototype interface{}) (result interface{}) { +func WriteMessage2Any(channel, command string, kind byte, message interface{}, answerPrototype interface{}) (result interface{}) { cacheListenerDatabaseLock.RLock() defer cacheListenerDatabaseLock.RUnlock() @@ -22,10 +22,17 @@ func WriteMessage2Any(channel, command string, message interface{}, answerProtot for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { listener := entry.Value.(Scheme.Listener) - // If the channel and the command matches, store the listener: - if listener.Channel == channel && listener.Command == command { - entries = entries[:len(entries)+1] - entries[counter] = listener + // If the channel, command and kind matches, store the listener: + if kind == KindALL { + if listener.Channel == channel && listener.Command == command { + entries = entries[:len(entries)+1] + entries[counter] = listener + } + } else { + if listener.Channel == channel && listener.Command == command && listener.Kind == kind { + entries = entries[:len(entries)+1] + entries[counter] = listener + } } } diff --git a/System/Version/Variables.go b/System/Version/Variables.go index d5d1152..9ef3acb 100644 --- a/System/Version/Variables.go +++ b/System/Version/Variables.go @@ -1,5 +1,5 @@ package Version var ( - oceansVersion string = `2.0.0` // Ocean's current version + oceansVersion string = `2.0.2` // Ocean's current version ) diff --git a/WebServer/Start.go b/WebServer/Start.go index 05ca59d..6a5810a 100644 --- a/WebServer/Start.go +++ b/WebServer/Start.go @@ -30,7 +30,7 @@ func Start() { } // Notify the whole cluster, that this server is now up and ready: - answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, data, SystemMessages.ICCCDefaultAnswer{}) + answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, ICCC.KindALL, data, SystemMessages.ICCCDefaultAnswer{}) for n, obj := range answers { if obj != nil { answer := obj.(SystemMessages.ICCCDefaultAnswer)