Improved ICCC as preparation for external components

This commit is contained in:
Thorsten Sommer 2015-07-10 14:36:47 +02:00
parent a566ccbd6a
commit 86121b733d
31 changed files with 491 additions and 79 deletions

View File

@ -7,43 +7,61 @@ import (
LM "github.com/SommerEngineering/Ocean/Log/Meta" LM "github.com/SommerEngineering/Ocean/Log/Meta"
"github.com/SommerEngineering/Ocean/Shutdown" "github.com/SommerEngineering/Ocean/Shutdown"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
"time"
) )
// Internal function for the timer logic thread. // Internal function for the timer-logic thread.
func cacheTimerLogic(waiting bool) { func cacheTimerLogic() {
// Case: This server goes down now. // Case: This server goes down now.
if Shutdown.IsDown() { if Shutdown.IsDown() {
return return
} }
// Define the query and get the iterator: // Get the current counts:
lastCount := cacheListenerDatabase.Len() lastCountListener := cacheListenerDatabase.Len()
selection := bson.D{{`IsActive`, true}} lastCountHosts := cacheHostDatabase.Len()
entriesIterator := collectionListener.Find(selection).Iter()
entry := Scheme.Listener{} // Define the queries:
selectionListeners := bson.D{{`IsActive`, true}}
selectionHosts := bson.D{}
// Get the iterators:
entriesIteratorListeners := collectionListener.Find(selectionListeners).Iter()
entriesIteratorHosts := collectionHosts.Find(selectionHosts).Iter()
//
// Execute the listeners first:
//
entryListener := Scheme.Listener{}
cacheListenerDatabaseLock.Lock() cacheListenerDatabaseLock.Lock()
// Re-init the cache: // Re-init the cache:
cacheListenerDatabase.Init() cacheListenerDatabase.Init()
// Loop over all entries // Loop over all entries
for entriesIterator.Next(&entry) { for entriesIteratorListeners.Next(&entryListener) {
cacheListenerDatabase.PushBack(entry) cacheListenerDatabase.PushBack(entryListener)
} }
cacheListenerDatabaseLock.Unlock() cacheListenerDatabaseLock.Unlock()
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The listener cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCount), fmt.Sprintf(`new count=%d`, cacheListenerDatabase.Len())) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The listener cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCountListener), fmt.Sprintf(`new count=%d`, cacheListenerDatabase.Len()))
// In case, that this function runs at a thread, we want to wait: //
if waiting { // Execute now the hosts:
nextDuration := time.Duration(5) * time.Minute //
if cacheListenerDatabase.Len() == 0 {
nextDuration = time.Duration(10) * time.Second
}
time.Sleep(nextDuration) entryHost := Scheme.Host{}
cacheHostDatabaseLock.Lock()
// Re-init the cache:
cacheHostDatabase.Init()
// Loop over all entries
for entriesIteratorHosts.Next(&entryHost) {
cacheHostDatabase.PushBack(entryHost)
} }
cacheHostDatabaseLock.Unlock()
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The host cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCountHosts), fmt.Sprintf(`new count=%d`, cacheHostDatabase.Len()))
} }

56
ICCC/ICCCDeleteHost.go Normal file
View File

@ -0,0 +1,56 @@
package ICCC
import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
"gopkg.in/mgo.v2/bson"
)
// The receiver function for the ICCC message, that deletes a host.
func ICCCDeleteHostReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete host message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCDeleteHost{}, data)
// Was it possible to convert the data?
if obj != nil {
// Convert the object:
messageData := obj.(SystemMessages.ICCCDeleteHost)
// The database selection:
selectionDelete := bson.D{{`Hostname`, messageData.Hostname}, {`IPAddressPort`, messageData.IPAddressPort}}
// Delete the entry:
if errDelete := collectionHosts.Remove(selectionDelete); errDelete != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete host message.", errDelete.Error(), fmt.Sprintf("hostname='%s'", messageData.Hostname), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort))
return Message2Data(``, ``, SystemMessages.AnswerNACK)
} else {
//
// Case: No error
//
// Update the cache as soon as possible:
InitCacheNow()
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC host was deleted.", fmt.Sprintf("hostname='%s'", messageData.Hostname), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort))
return Message2Data(``, ``, SystemMessages.AnswerACK)
}
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

View File

@ -0,0 +1,56 @@
package ICCC
import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
"gopkg.in/mgo.v2/bson"
)
// The receiver function for the ICCC message, that deletes an listener.
func ICCCDeleteListenerReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete listener message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCDeleteListener{}, data)
// Was it possible to convert the data?
if obj != nil {
// Convert the object:
messageData := obj.(SystemMessages.ICCCDeleteListener)
// The database selection:
selectionDelete := bson.D{{`Channel`, messageData.Channel}, {`Command`, messageData.Command}, {`IPAddressPort`, messageData.IPAddressPort}}
// Delete the entry:
if errDelete := collectionListener.Remove(selectionDelete); errDelete != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete listener message.", errDelete.Error(), fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort))
return Message2Data(``, ``, SystemMessages.AnswerNACK)
} else {
//
// Case: No error
//
// Update the cache as soon as possible:
InitCacheNow()
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC listener was deleted.", fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort))
return Message2Data(``, ``, SystemMessages.AnswerACK)
}
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

View File

@ -0,0 +1,60 @@
package ICCC
import (
"github.com/SommerEngineering/Ocean/ICCC/Scheme"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
)
// The receiver function for the ICCC message, that yields the hosts.
func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC get hosts message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCGetHosts{}, data)
// Was it possible to convert the data?
if obj != nil {
// We have to read from the cache:
cacheHostDatabaseLock.RLock()
// How many hosts we currently known?
countHosts := cacheHostDatabase.Len()
// Prepare the answer object:
answerMessage := SystemMessages.ICCCGetHostsAnswer{}
answerMessage.Hostnames = make([]string, countHosts, countHosts)
answerMessage.IPAddressesPorts = make([]string, countHosts, countHosts)
// Loop over all hosts which are currently available at the cache:
n := 0
for entry := cacheHostDatabase.Front(); entry != nil; entry = entry.Next() {
host := entry.Value.(Scheme.Host)
answerMessage.Hostnames[n] = host.Hostname
answerMessage.IPAddressesPorts[n] = host.IPAddressPort
n++
}
// Unlock the cache:
cacheHostDatabaseLock.RUnlock()
// Send the answer:
return Message2Data(``, ``, answerMessage)
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

View File

@ -0,0 +1,62 @@
package ICCC
import (
"github.com/SommerEngineering/Ocean/ICCC/Scheme"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
)
// The receiver function for the ICCC message, that yields the listeners.
func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC get listeners message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCGetListeners{}, data)
// Was it possible to convert the data?
if obj != nil {
// We have to read from the cache:
cacheListenerDatabaseLock.RLock()
// How many listeners we currently known?
countListeners := cacheListenerDatabase.Len()
// Prepare the answer object:
answerMessage := SystemMessages.ICCCGetListenersAnswer{}
answerMessage.Channels = make([]string, countListeners, countListeners)
answerMessage.Commands = make([]string, countListeners, countListeners)
answerMessage.IPAddressesPorts = make([]string, countListeners, countListeners)
// Loop over all hosts which are currently available at the cache:
n := 0
for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() {
listener := entry.Value.(Scheme.Listener)
answerMessage.Channels[n] = listener.Channel
answerMessage.Commands[n] = listener.Command
answerMessage.IPAddressesPorts[n] = listener.IPAddressPort
n++
}
// Unlock the cache:
cacheListenerDatabaseLock.RUnlock()
// Send the answer:
return Message2Data(``, ``, answerMessage)
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

View File

@ -0,0 +1,64 @@
package ICCC
import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC/Scheme"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
"gopkg.in/mgo.v2/bson"
)
// The receiver function for the ICCC message, that updates an listener.
func ICCCListenerUpdateReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC update listener message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCListenerUpdate{}, data)
// Was it possible to convert the data?
if obj != nil {
// Convert the object:
messageData := obj.(SystemMessages.ICCCListenerUpdate)
// The database selection:
selectionUpdate := bson.D{{`Channel`, messageData.Channel}, {`Command`, messageData.Command}, {`IPAddressPort`, messageData.IPAddressPort}}
// The object with holds the new state:
updatedObject := Scheme.Listener{}
updatedObject.Channel = messageData.Channel
updatedObject.Command = messageData.Command
updatedObject.IPAddressPort = messageData.IPAddressPort
updatedObject.IsActive = messageData.IsActiveNew
// Update the entry:
if errUpdate := collectionListener.Update(selectionUpdate, updatedObject); errUpdate != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC update listener message.", errUpdate.Error(), fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort))
return Message2Data(``, ``, SystemMessages.AnswerNACK)
} else {
//
// Case: No error
//
// Update the cache as soon as possible:
InitCacheNow()
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC listener was updated.", fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort), fmt.Sprintf("isActive=%v", messageData.IsActiveNew))
return Message2Data(``, ``, SystemMessages.AnswerACK)
}
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

35
ICCC/ICCCPingReceiver.go Normal file
View File

@ -0,0 +1,35 @@
package ICCC
import (
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
)
// The receiver function for the ICCC ping message.
func ICCCPingReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC ping message.")
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object:
_, _, obj := Data2Message(SystemMessages.ICCCPing{}, data)
// Was it possible to convert the data?
if obj != nil {
// An answer is necessary:
return Message2Data("", "", SystemMessages.AnswerACK)
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to convert the ping message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
}

View File

@ -8,12 +8,12 @@ import (
) )
// The receiver function for the ICCC message, that registers a host. // The receiver function for the ICCC message, that registers a host.
func ICCCRegisterHost(data map[string][]string) (result map[string][]string) { func ICCCRegisterHostReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors: // Recover from errors:
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC register host message. %s", err)) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC register host message.")
result = make(map[string][]string, 0) result = make(map[string][]string, 0)
return return
} }
@ -28,11 +28,14 @@ func ICCCRegisterHost(data map[string][]string) (result map[string][]string) {
messageData := obj.(SystemMessages.ICCCRegisterHost) messageData := obj.(SystemMessages.ICCCRegisterHost)
// Provide a log entry: // Provide a log entry:
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another host.`, messageData.Hostname, messageData.IPAddressPort) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Register another host.`, fmt.Sprintf("hostname=%s", messageData.Hostname), fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort))
// Execute the command: // Execute the command:
registerHost2Database(messageData.Hostname, messageData.IPAddressPort) registerHost2Database(messageData.Hostname, messageData.IPAddressPort)
// Update the caches:
InitCacheNow()
// An answer is necessary: // An answer is necessary:
return Message2Data(``, ``, SystemMessages.AnswerACK) return Message2Data(``, ``, SystemMessages.AnswerACK)
} else { } else {

View File

@ -7,13 +7,13 @@ import (
LM "github.com/SommerEngineering/Ocean/Log/Meta" LM "github.com/SommerEngineering/Ocean/Log/Meta"
) )
// The receiver function for the ICCC message, that registers a command. // The receiver function for the ICCC message, that registers a listener.
func ICCCRegisterCommand(data map[string][]string) (result map[string][]string) { func ICCCRegisterListenerReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors: // Recover from errors:
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC register command message. %s", err)) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC register listener message.")
result = make(map[string][]string, 0) result = make(map[string][]string, 0)
return return
} }
@ -28,10 +28,13 @@ func ICCCRegisterCommand(data map[string][]string) (result map[string][]string)
messageData := obj.(SystemMessages.ICCCRegisterListener) messageData := obj.(SystemMessages.ICCCRegisterListener)
// Provide a log entry: // Provide a log entry:
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another command.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive)) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another listener.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive))
// Execute the command: // Execute the command:
registerCommand2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive) registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive)
// Update the caches:
InitCacheNow()
// An answer is necessary: // An answer is necessary:
return Message2Data(``, ``, SystemMessages.AnswerACK) return Message2Data(``, ``, SystemMessages.AnswerACK)

View File

@ -12,8 +12,9 @@ func init() {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Start init of ICCC.`) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Start init of ICCC.`)
defer Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Done init ICCC.`) defer Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Done init ICCC.`)
// Create the list as cache for all global listener (not only listener from this server): // Create the list as cache for all global listener and hosts (not only listener from this server):
cacheListenerDatabase = list.New() cacheListenerDatabase = list.New()
cacheHostDatabase = list.New()
// Create a mapping as cache for all local listener end-points (functions): // Create a mapping as cache for all local listener end-points (functions):
listeners = make(map[string]func(data map[string][]string) map[string][]string) listeners = make(map[string]func(data map[string][]string) map[string][]string)

View File

@ -1,13 +1,6 @@
package ICCC package ICCC
// Starts the timer cache once and exit it after (no thread, no endless loop). // Starts the timer cache logic once and return after (no thread, no endless loop)
func InitCacheNow() { func InitCacheNow() {
startCacheTimerLock.Lock() cacheTimerLogic()
defer startCacheTimerLock.Unlock()
if cacheTimerRunning {
return
}
cacheTimerLogic(false)
} }

View File

@ -1,5 +1,9 @@
package ICCC package ICCC
import (
"time"
)
// Setup and starts the cache timer. // Setup and starts the cache timer.
func initCacheTimer() { func initCacheTimer() {
startCacheTimerLock.Lock() startCacheTimerLock.Lock()
@ -11,11 +15,15 @@ func initCacheTimer() {
cacheTimerRunning = true cacheTimerRunning = true
} }
// Start another thread with the timer logic: // Start another thread with the timer-logic:
go func() { go func() {
// Endless loop: // Endless loop:
for { for {
cacheTimerLogic(true) // Execute the logic:
cacheTimerLogic()
// Wait five minutes:
time.Sleep(time.Duration(5) * time.Minute)
} }
}() }()
} }

View File

@ -52,15 +52,11 @@ func initDB() {
// //
// Index for hosts: // Index for hosts:
// //
collectionHosts.EnsureIndexKey(`Hostname`, `IPAddressPort`) collectionHosts.EnsureIndexKey(`Hostname`)
collectionHosts.EnsureIndexKey(`IPAddressPort`)
indexName2 := mgo.Index{} indexName2 := mgo.Index{}
indexName2.Key = []string{`Hostname`} indexName2.Key = []string{`Hostname`, `IPAddressPort`}
indexName2.Unique = true indexName2.Unique = true
collectionHosts.EnsureIndex(indexName2) collectionHosts.EnsureIndex(indexName2)
indexName3 := mgo.Index{}
indexName3.Key = []string{`IPAddressPort`}
indexName3.Unique = true
collectionHosts.EnsureIndex(indexName3)
} }

View File

@ -8,8 +8,8 @@ import (
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
// The internal function to register a command to ICCC. // The internal function to register an listener to ICCC.
func registerCommand2Database(channel, command, ipAddressPort string, isActive bool) { func registerListener2Database(channel, command, ipAddressPort string, isActive bool) {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive)) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive))
entry := Scheme.Listener{} entry := Scheme.Listener{}

View File

@ -1,12 +0,0 @@
package ICCC
// The internal function to register a local command to ICCC.
func registerLocalCommand2Database(channel, command string) {
/*
Cannot use here the ICCC command to register this command.
Because, this host is maybe the first one. In that case,
there would be no server which can execute the ICCC command.
Therefore, every Ocean server registers the own commans directly.
*/
registerCommand2Database(channel, command, correctAddressWithPort, true)
}

View File

@ -0,0 +1,12 @@
package ICCC
// The internal function to register a local listener to ICCC.
func registerLocalListener2Database(channel, command string) {
/*
Cannot use here the ICCC command to register this listener.
Because, this host is maybe the first one. In that case,
there would be no server which can execute the ICCC command.
Therefore, every Ocean server registers the own listeners directly.
*/
registerListener2Database(channel, command, correctAddressWithPort, true)
}

View File

@ -12,7 +12,7 @@ func Registrar(channel, command string, callback func(data map[string][]string)
defer listenersLock.Unlock() defer listenersLock.Unlock()
// Write the command to the database: // Write the command to the database:
registerLocalCommand2Database(channel, command) registerLocalListener2Database(channel, command)
// Register the command at the local cache: // Register the command at the local cache:
listeners[fmt.Sprintf(`%s::%s`, channel, command)] = callback listeners[fmt.Sprintf(`%s::%s`, channel, command)] = callback

View File

@ -35,5 +35,5 @@ func (a ShutdownFunction) Shutdown() {
// Disconnect the database: // Disconnect the database:
db.Logout() db.Logout()
dbSession.Close() dbSession.Close()
Log.LogShort(senderName, LM.CategoryAPP, LM.LevelWARN, LM.MessageNameSHUTDOWN, `Done shutting down now all ICCC listener for this host.`) Log.LogShort(senderName, LM.CategoryAPP, LM.LevelWARN, LM.MessageNameSHUTDOWN, `Done shutting down all ICCC listener for this host.`)
} }

View File

@ -0,0 +1,7 @@
package SystemMessages
// The message to delete a host from ICCC.
type ICCCDeleteHost struct {
Hostname string
IPAddressPort string
}

View File

@ -0,0 +1,8 @@
package SystemMessages
// The message to delete an listener from ICCC.
type ICCCDeleteListener struct {
Channel string
Command string
IPAddressPort string
}

View File

@ -0,0 +1,11 @@
package SystemMessages
// A message to request the known ICCC hosts.
type ICCCGetHosts struct {
}
// The answer to the hosts request.
type ICCCGetHostsAnswer struct {
Hostnames []string
IPAddressesPorts []string
}

View File

@ -0,0 +1,12 @@
package SystemMessages
// A message to request the known ICCC listeners.
type ICCCGetListeners struct {
}
// The answer to the listeners request.
type ICCCGetListenersAnswer struct {
Channels []string
Commands []string
IPAddressesPorts []string
}

View File

@ -0,0 +1,9 @@
package SystemMessages
// The message to update an listener from ICCC.
type ICCCListenerUpdate struct {
Channel string
Command string
IPAddressPort string
IsActiveNew bool
}

View File

@ -0,0 +1,5 @@
package SystemMessages
// The ping message to check status of other hosts.
type ICCCPing struct {
}

View File

@ -1,6 +1,6 @@
package SystemMessages package SystemMessages
// The message to register a command/listener to ICCC. // The message to register an listener to ICCC.
type ICCCRegisterListener struct { type ICCCRegisterListener struct {
Channel string // The channel for the provided command Channel string // The channel for the provided command
Command string // The provided command Command string // The provided command

View File

@ -27,6 +27,8 @@ var (
listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache
cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers
cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache
cacheHostDatabase *list.List = nil // The cache for all hosts entries
cacheHostDatabaseLock sync.RWMutex = sync.RWMutex{} // The read-write mutex for the host cache
startCacheTimerLock sync.Mutex = sync.Mutex{} // Mutex for the start timer startCacheTimerLock sync.Mutex = sync.Mutex{} // Mutex for the start timer
cacheTimerRunning bool = false // Is the timer running? cacheTimerRunning bool = false // Is the timer running?
correctAddressWithPort string = `` // The IP address and port of the this local server correctAddressWithPort string = `` // The IP address and port of the this local server

View File

@ -1,7 +1,6 @@
package NumGen package NumGen
import ( import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log" "github.com/SommerEngineering/Ocean/Log"
@ -9,12 +8,12 @@ import (
) )
// The receiver function for the ICCC message, that registers a command. // The receiver function for the ICCC message, that registers a command.
func ICCCNextNumber(data map[string][]string) (result map[string][]string) { func ICCCNextNumberReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors: // Recover from errors:
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC next number command message. %s", err)) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC next number command message.")
result = make(map[string][]string, 0) result = make(map[string][]string, 0)
return return
} }

View File

@ -9,12 +9,12 @@ import (
) )
// The receiver function for the ICCC message, that a external component is up and running. // The receiver function for the ICCC message, that a external component is up and running.
func icccComponentStartUpMessage(data map[string][]string) (result map[string][]string) { func icccComponentStartUpMessageReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors: // Recover from errors:
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC component startup message. %s", err)) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC component startup message.")
result = make(map[string][]string, 0) result = make(map[string][]string, 0)
return return
} }
@ -29,13 +29,12 @@ func icccComponentStartUpMessage(data map[string][]string) (result map[string][]
messageData := obj.(SystemMessages.ICCCComponentStartUpMessage) messageData := obj.(SystemMessages.ICCCComponentStartUpMessage)
// Provide a log entry: // Provide a log entry:
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The external component is now up and ready.`, messageData.IPAddressPort) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The external component is now up and ready.`, fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort))
// An answer is necessary: // An answer is necessary:
return ICCC.Message2Data("", "", SystemMessages.AnswerACK) return ICCC.Message2Data("", "", SystemMessages.AnswerACK)
} else { } else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
fmt.Println(`[Error] ICCC message: Was not able to create the message.`)
} }
// In any other error case: // In any other error case:

View File

@ -9,12 +9,12 @@ import (
) )
// The receiver function for the ICCC message, that an Ocean server is up and running. // The receiver function for the ICCC message, that an Ocean server is up and running.
func icccOceanStartUpMessage(data map[string][]string) (result map[string][]string) { func icccOceanStartUpMessageReceiver(data map[string][]string) (result map[string][]string) {
// Recover from errors: // Recover from errors:
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC Ocean server startup message. %s", err)) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC Ocean server startup message.")
result = make(map[string][]string, 0) result = make(map[string][]string, 0)
return return
} }
@ -29,13 +29,12 @@ func icccOceanStartUpMessage(data map[string][]string) (result map[string][]stri
messageData := obj.(SystemMessages.ICCCOceanStartUpMessage) messageData := obj.(SystemMessages.ICCCOceanStartUpMessage)
// Provide a log entry: // Provide a log entry:
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The Ocean server is now up and ready.`, messageData.PublicIPAddressPort, messageData.AdminIPAddressPort) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The Ocean server is now up and ready.`, fmt.Sprintf("public server=%s", messageData.PublicIPAddressPort), fmt.Sprintf("admin server=%s", messageData.AdminIPAddressPort))
// An answer is necessary: // An answer is necessary:
return ICCC.Message2Data("", "", SystemMessages.AnswerACK) return ICCC.Message2Data("", "", SystemMessages.AnswerACK)
} else { } else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
fmt.Println(`[Error] ICCC message: Was not able to create the message.`)
} }
// In any other error case: // In any other error case:

View File

@ -97,11 +97,17 @@ func initSystem() {
// The logging subsystem is not registered here, because it will be automated called at the end // The logging subsystem is not registered here, because it will be automated called at the end
// Register all system ICCC commands: // Register all system ICCC commands:
ICCC.Registrar(ICCC.ChannelSYSTEM, `System::OceanStart`, icccOceanStartUpMessage) ICCC.Registrar(ICCC.ChannelSTARTUP, `System::OceanStart`, icccOceanStartUpMessageReceiver)
ICCC.Registrar(ICCC.ChannelSYSTEM, `System::ComponentStart`, icccComponentStartUpMessage) ICCC.Registrar(ICCC.ChannelSTARTUP, `System::ComponentStart`, icccComponentStartUpMessageReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterHost`, ICCC.ICCCRegisterHost) ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterHost`, ICCC.ICCCRegisterHostReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterCommand`, ICCC.ICCCRegisterCommand) ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterListener`, ICCC.ICCCRegisterListenerReceiver)
ICCC.Registrar(ICCC.ChannelNUMGEN, `NumGen::Next`, NumGen.ICCCNextNumber) ICCC.Registrar(ICCC.ChannelICCC, `ICCC::DeleteListener`, ICCC.ICCCDeleteListenerReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::DeleteHost`, ICCC.ICCCDeleteHostReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::ListenerUpdate`, ICCC.ICCCListenerUpdateReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::Ping`, ICCC.ICCCPingReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::GetHosts`, ICCC.ICCCGetHostsReceiver)
ICCC.Registrar(ICCC.ChannelICCC, `ICCC::GetListeners`, ICCC.ICCCGetListenersReceiver)
ICCC.Registrar(ICCC.ChannelNUMGEN, `NumGen::Next`, NumGen.ICCCNextNumberReceiver)
// Start the ICCC Listener Cache: // Start the ICCC Listener Cache:
ICCC.InitCacheNow() // Blocking, until the job is done ICCC.InitCacheNow() // Blocking, until the job is done

View File

@ -28,7 +28,7 @@ func Start() {
} }
// Notify the whole cluster, that this server is now up and ready: // Notify the whole cluster, that this server is now up and ready:
answers := ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data, SystemMessages.ICCCDefaultAnswer{}) answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, data, SystemMessages.ICCCDefaultAnswer{})
for n, obj := range answers { for n, obj := range answers {
if obj != nil { if obj != nil {
answer := obj.(SystemMessages.ICCCDefaultAnswer) answer := obj.(SystemMessages.ICCCDefaultAnswer)