Implemented the fixed ICCC driver specification

This commit is contained in:
Thorsten Sommer 2015-07-13 10:44:03 +02:00
parent 72e2870684
commit cc68365977
22 changed files with 49 additions and 19 deletions

View File

@ -35,6 +35,7 @@ func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string)
answerMessage := SystemMessages.ICCCGetHostsAnswer{} answerMessage := SystemMessages.ICCCGetHostsAnswer{}
answerMessage.Hostnames = make([]string, countHosts, countHosts) answerMessage.Hostnames = make([]string, countHosts, countHosts)
answerMessage.IPAddressesPorts = make([]string, countHosts, countHosts) answerMessage.IPAddressesPorts = make([]string, countHosts, countHosts)
answerMessage.Kinds = make([]byte, countHosts, countHosts)
// Loop over all hosts which are currently available at the cache: // Loop over all hosts which are currently available at the cache:
n := 0 n := 0
@ -42,6 +43,7 @@ func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string)
host := entry.Value.(Scheme.Host) host := entry.Value.(Scheme.Host)
answerMessage.Hostnames[n] = host.Hostname answerMessage.Hostnames[n] = host.Hostname
answerMessage.IPAddressesPorts[n] = host.IPAddressPort answerMessage.IPAddressesPorts[n] = host.IPAddressPort
answerMessage.Kinds[n] = host.Kind
n++ n++
} }

View File

@ -36,6 +36,7 @@ func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]str
answerMessage.Channels = make([]string, countListeners, countListeners) answerMessage.Channels = make([]string, countListeners, countListeners)
answerMessage.Commands = make([]string, countListeners, countListeners) answerMessage.Commands = make([]string, countListeners, countListeners)
answerMessage.IPAddressesPorts = make([]string, countListeners, countListeners) answerMessage.IPAddressesPorts = make([]string, countListeners, countListeners)
answerMessage.Kinds = make([]byte, countListeners, countListeners)
// Loop over all hosts which are currently available at the cache: // Loop over all hosts which are currently available at the cache:
n := 0 n := 0
@ -44,6 +45,7 @@ func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]str
answerMessage.Channels[n] = listener.Channel answerMessage.Channels[n] = listener.Channel
answerMessage.Commands[n] = listener.Command answerMessage.Commands[n] = listener.Command
answerMessage.IPAddressesPorts[n] = listener.IPAddressPort answerMessage.IPAddressesPorts[n] = listener.IPAddressPort
answerMessage.Kinds[n] = listener.Kind
n++ n++
} }

View File

@ -28,7 +28,7 @@ func ICCCGetVersionReceiver(data map[string][]string) (result map[string][]strin
// Prepare the answer: // Prepare the answer:
answer := SystemMessages.ICCCGetVersionAnswer{} answer := SystemMessages.ICCCGetVersionAnswer{}
answer.Kind = `Ocean` answer.Kind = KindOCEAN
answer.Name = Tools.ThisHostname() answer.Name = Tools.ThisHostname()
answer.Version = Version.GetVersion() answer.Version = Version.GetVersion()

View File

@ -31,7 +31,7 @@ func ICCCRegisterHostReceiver(data map[string][]string) (result map[string][]str
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Register another host.`, fmt.Sprintf("hostname=%s", messageData.Hostname), fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort)) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Register another host.`, fmt.Sprintf("hostname=%s", messageData.Hostname), fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort))
// Execute the command: // Execute the command:
registerHost2Database(messageData.Hostname, messageData.IPAddressPort) registerHost2Database(messageData.Hostname, messageData.IPAddressPort, messageData.Kind)
// Update the caches: // Update the caches:
InitCacheNow() InitCacheNow()

View File

@ -31,7 +31,7 @@ func ICCCRegisterListenerReceiver(data map[string][]string) (result map[string][
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another listener.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive)) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another listener.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive))
// Execute the command: // Execute the command:
registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive) registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive, messageData.Kind)
// Update the caches: // Update the caches:
InitCacheNow() InitCacheNow()

View File

@ -28,6 +28,7 @@ func initDB() {
// //
// Take care about the indexes for ICCCListener: // Take care about the indexes for ICCCListener:
// //
collectionListener.EnsureIndexKey(`Kind`)
collectionListener.EnsureIndexKey(`Command`) collectionListener.EnsureIndexKey(`Command`)
collectionListener.EnsureIndexKey(`Command`, `IsActive`) collectionListener.EnsureIndexKey(`Command`, `IsActive`)
@ -52,6 +53,7 @@ func initDB() {
// //
// Index for hosts: // Index for hosts:
// //
collectionHosts.EnsureIndexKey(`Kind`)
collectionHosts.EnsureIndexKey(`Hostname`) collectionHosts.EnsureIndexKey(`Hostname`)
collectionHosts.EnsureIndexKey(`IPAddressPort`) collectionHosts.EnsureIndexKey(`IPAddressPort`)

View File

@ -8,12 +8,13 @@ import (
) )
// Function to register a server to the ICCC. // Function to register a server to the ICCC.
func registerHost2Database(hostname, ipAddressPort string) { func registerHost2Database(hostname, ipAddressPort string, kind byte) {
// Create the host entry: // Create the host entry:
host := Scheme.Host{} host := Scheme.Host{}
host.Hostname = hostname host.Hostname = hostname
host.IPAddressPort = ipAddressPort host.IPAddressPort = ipAddressPort
host.Kind = kind
// The query to find already existing entries: // The query to find already existing entries:
selection := bson.D{{`Hostname`, host.Hostname}, {`IPAddressPort`, host.IPAddressPort}} selection := bson.D{{`Hostname`, host.Hostname}, {`IPAddressPort`, host.IPAddressPort}}

View File

@ -9,7 +9,7 @@ import (
) )
// The internal function to register an listener to ICCC. // The internal function to register an listener to ICCC.
func registerListener2Database(channel, command, ipAddressPort string, isActive bool) { func registerListener2Database(channel, command, ipAddressPort string, isActive bool, kind byte) {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive)) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive))
entry := Scheme.Listener{} entry := Scheme.Listener{}
@ -17,6 +17,7 @@ func registerListener2Database(channel, command, ipAddressPort string, isActive
entry.Command = command entry.Command = command
entry.IsActive = isActive entry.IsActive = isActive
entry.IPAddressPort = ipAddressPort entry.IPAddressPort = ipAddressPort
entry.Kind = kind
// //
// Case: Exists? // Case: Exists?

View File

@ -8,5 +8,5 @@ func registerLocalListener2Database(channel, command string) {
there would be no server which can execute the ICCC command. there would be no server which can execute the ICCC command.
Therefore, every Ocean server registers the own listeners directly. Therefore, every Ocean server registers the own listeners directly.
*/ */
registerListener2Database(channel, command, correctAddressWithPort, true) registerListener2Database(channel, command, correctAddressWithPort, true, KindOCEAN)
} }

View File

@ -12,5 +12,5 @@ func registerThisHost2Database() {
there would be no server which can execute the ICCC command. there would be no server which can execute the ICCC command.
Therefore, every Ocean server registers the own host directly. Therefore, every Ocean server registers the own host directly.
*/ */
registerHost2Database(Tools.ThisHostname(), correctAddressWithPort) registerHost2Database(Tools.ThisHostname(), correctAddressWithPort, KindOCEAN)
} }

View File

@ -4,4 +4,5 @@ package Scheme
type Host struct { type Host struct {
Hostname string `bson:"Hostname"` Hostname string `bson:"Hostname"`
IPAddressPort string `bson:"IPAddressPort"` IPAddressPort string `bson:"IPAddressPort"`
Kind byte `bson:"Kind"`
} }

View File

@ -6,4 +6,5 @@ type Listener struct {
Command string `bson:"Command"` Command string `bson:"Command"`
IsActive bool `bson:"IsActive"` IsActive bool `bson:"IsActive"`
IPAddressPort string `bson:"IPAddressPort"` IPAddressPort string `bson:"IPAddressPort"`
Kind byte `bson:"Kind"`
} }

View File

@ -8,4 +8,5 @@ type ICCCGetHosts struct {
type ICCCGetHostsAnswer struct { type ICCCGetHostsAnswer struct {
Hostnames []string Hostnames []string
IPAddressesPorts []string IPAddressesPorts []string
Kinds []byte
} }

View File

@ -9,4 +9,5 @@ type ICCCGetListenersAnswer struct {
Channels []string Channels []string
Commands []string Commands []string
IPAddressesPorts []string IPAddressesPorts []string
Kinds []byte
} }

View File

@ -6,7 +6,7 @@ type ICCCGetVersion struct {
// Answer to the version request // Answer to the version request
type ICCCGetVersionAnswer struct { type ICCCGetVersionAnswer struct {
Kind string // Ocean || Component Kind byte // Ocean || Component
Name string // Ocean: Hostname; Components: Name Name string // Ocean: Hostname; Components: Name
Version string // The current version Version string // The current version
} }

View File

@ -4,4 +4,5 @@ package SystemMessages
type ICCCRegisterHost struct { type ICCCRegisterHost struct {
Hostname string // The hostname for the end-point Hostname string // The hostname for the end-point
IPAddressPort string // The IP address and port for the end-point IPAddressPort string // The IP address and port for the end-point
Kind byte // Ocean || Component
} }

View File

@ -6,4 +6,5 @@ type ICCCRegisterListener struct {
Command string // The provided command Command string // The provided command
IsActive bool // Is the command active? IsActive bool // Is the command active?
IPAddressPort string // The IP address and port for the end-point IPAddressPort string // The IP address and port for the end-point
Kind byte // Ocean || Component
} }

View File

@ -16,6 +16,9 @@ const (
ChannelICCC string = `System::ICCC` // A common ICCC channel. ChannelICCC string = `System::ICCC` // A common ICCC channel.
ChannelPING string = `System::Ping` // A channel for pings. ChannelPING string = `System::Ping` // A channel for pings.
ChannelLOGGING string = `System::Logging` // A channel for send log events to the logging system ChannelLOGGING string = `System::Logging` // A channel for send log events to the logging system
KindOCEAN byte = 0x0
KindComponent byte = 0x1
KindALL byte = 0xFF
) )
var ( var (

View File

@ -7,7 +7,7 @@ import (
) )
// Function to broadcast a message to all listeners. // Function to broadcast a message to all listeners.
func WriteMessage2All(channel, command string, message interface{}, answerPrototype interface{}) (results []interface{}) { func WriteMessage2All(channel, command string, kind byte, message interface{}, answerPrototype interface{}) (results []interface{}) {
cacheListenerDatabaseLock.RLock() cacheListenerDatabaseLock.RLock()
defer cacheListenerDatabaseLock.RUnlock() defer cacheListenerDatabaseLock.RUnlock()
@ -21,9 +21,15 @@ func WriteMessage2All(channel, command string, message interface{}, answerProtot
for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() {
listener := entry.Value.(Scheme.Listener) listener := entry.Value.(Scheme.Listener)
// If the channel and the command matches, deliver the message: // If the channel, command and kind matches, deliver the message:
if listener.Channel == channel && listener.Command == command { if kind == KindALL {
matchingListener = append(matchingListener, listener) if listener.Channel == channel && listener.Command == command {
matchingListener = append(matchingListener, listener)
}
} else {
if listener.Channel == channel && listener.Command == command && listener.Kind == kind {
matchingListener = append(matchingListener, listener)
}
} }
} }

View File

@ -8,7 +8,7 @@ import (
) )
// Function to write a message to any listener. // Function to write a message to any listener.
func WriteMessage2Any(channel, command string, message interface{}, answerPrototype interface{}) (result interface{}) { func WriteMessage2Any(channel, command string, kind byte, message interface{}, answerPrototype interface{}) (result interface{}) {
cacheListenerDatabaseLock.RLock() cacheListenerDatabaseLock.RLock()
defer cacheListenerDatabaseLock.RUnlock() defer cacheListenerDatabaseLock.RUnlock()
@ -22,10 +22,17 @@ func WriteMessage2Any(channel, command string, message interface{}, answerProtot
for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() {
listener := entry.Value.(Scheme.Listener) listener := entry.Value.(Scheme.Listener)
// If the channel and the command matches, store the listener: // If the channel, command and kind matches, store the listener:
if listener.Channel == channel && listener.Command == command { if kind == KindALL {
entries = entries[:len(entries)+1] if listener.Channel == channel && listener.Command == command {
entries[counter] = listener entries = entries[:len(entries)+1]
entries[counter] = listener
}
} else {
if listener.Channel == channel && listener.Command == command && listener.Kind == kind {
entries = entries[:len(entries)+1]
entries[counter] = listener
}
} }
} }

View File

@ -1,5 +1,5 @@
package Version package Version
var ( var (
oceansVersion string = `2.0.0` // Ocean's current version oceansVersion string = `2.0.2` // Ocean's current version
) )

View File

@ -30,7 +30,7 @@ func Start() {
} }
// Notify the whole cluster, that this server is now up and ready: // Notify the whole cluster, that this server is now up and ready:
answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, data, SystemMessages.ICCCDefaultAnswer{}) answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, ICCC.KindALL, data, SystemMessages.ICCCDefaultAnswer{})
for n, obj := range answers { for n, obj := range answers {
if obj != nil { if obj != nil {
answer := obj.(SystemMessages.ICCCDefaultAnswer) answer := obj.(SystemMessages.ICCCDefaultAnswer)