diff --git a/ICCC/CacheTimerLogic.go b/ICCC/CacheTimerLogic.go index 81be0cb..553c26d 100644 --- a/ICCC/CacheTimerLogic.go +++ b/ICCC/CacheTimerLogic.go @@ -7,43 +7,61 @@ import ( LM "github.com/SommerEngineering/Ocean/Log/Meta" "github.com/SommerEngineering/Ocean/Shutdown" "gopkg.in/mgo.v2/bson" - "time" ) -// Internal function for the timer logic thread. -func cacheTimerLogic(waiting bool) { +// Internal function for the timer-logic thread. +func cacheTimerLogic() { // Case: This server goes down now. if Shutdown.IsDown() { return } - // Define the query and get the iterator: - lastCount := cacheListenerDatabase.Len() - selection := bson.D{{`IsActive`, true}} - entriesIterator := collectionListener.Find(selection).Iter() + // Get the current counts: + lastCountListener := cacheListenerDatabase.Len() + lastCountHosts := cacheHostDatabase.Len() - entry := Scheme.Listener{} + // Define the queries: + selectionListeners := bson.D{{`IsActive`, true}} + selectionHosts := bson.D{} + + // Get the iterators: + entriesIteratorListeners := collectionListener.Find(selectionListeners).Iter() + entriesIteratorHosts := collectionHosts.Find(selectionHosts).Iter() + + // + // Execute the listeners first: + // + + entryListener := Scheme.Listener{} cacheListenerDatabaseLock.Lock() // Re-init the cache: cacheListenerDatabase.Init() // Loop over all entries - for entriesIterator.Next(&entry) { - cacheListenerDatabase.PushBack(entry) + for entriesIteratorListeners.Next(&entryListener) { + cacheListenerDatabase.PushBack(entryListener) } cacheListenerDatabaseLock.Unlock() - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The listener cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCount), fmt.Sprintf(`new count=%d`, cacheListenerDatabase.Len())) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The listener cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCountListener), fmt.Sprintf(`new count=%d`, cacheListenerDatabase.Len())) - // In case, that this function runs at a thread, we want to wait: - if waiting { - nextDuration := time.Duration(5) * time.Minute - if cacheListenerDatabase.Len() == 0 { - nextDuration = time.Duration(10) * time.Second - } + // + // Execute now the hosts: + // - time.Sleep(nextDuration) + entryHost := Scheme.Host{} + cacheHostDatabaseLock.Lock() + + // Re-init the cache: + cacheHostDatabase.Init() + + // Loop over all entries + for entriesIteratorHosts.Next(&entryHost) { + cacheHostDatabase.PushBack(entryHost) } + + cacheHostDatabaseLock.Unlock() + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameEXECUTE, `The host cache was refreshed with the values from the database.`, fmt.Sprintf(`last count=%d`, lastCountHosts), fmt.Sprintf(`new count=%d`, cacheHostDatabase.Len())) } diff --git a/ICCC/ICCCDeleteHost.go b/ICCC/ICCCDeleteHost.go new file mode 100644 index 0000000..ee2ebac --- /dev/null +++ b/ICCC/ICCCDeleteHost.go @@ -0,0 +1,56 @@ +package ICCC + +import ( + "fmt" + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" + "gopkg.in/mgo.v2/bson" +) + +// The receiver function for the ICCC message, that deletes a host. +func ICCCDeleteHostReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete host message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCDeleteHost{}, data) + + // Was it possible to convert the data? + if obj != nil { + + // Convert the object: + messageData := obj.(SystemMessages.ICCCDeleteHost) + + // The database selection: + selectionDelete := bson.D{{`Hostname`, messageData.Hostname}, {`IPAddressPort`, messageData.IPAddressPort}} + + // Delete the entry: + if errDelete := collectionHosts.Remove(selectionDelete); errDelete != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete host message.", errDelete.Error(), fmt.Sprintf("hostname='%s'", messageData.Hostname), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort)) + return Message2Data(``, ``, SystemMessages.AnswerNACK) + } else { + // + // Case: No error + // + + // Update the cache as soon as possible: + InitCacheNow() + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC host was deleted.", fmt.Sprintf("hostname='%s'", messageData.Hostname), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort)) + return Message2Data(``, ``, SystemMessages.AnswerACK) + } + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCDeleteListenerReceiver.go b/ICCC/ICCCDeleteListenerReceiver.go new file mode 100644 index 0000000..b8c8788 --- /dev/null +++ b/ICCC/ICCCDeleteListenerReceiver.go @@ -0,0 +1,56 @@ +package ICCC + +import ( + "fmt" + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" + "gopkg.in/mgo.v2/bson" +) + +// The receiver function for the ICCC message, that deletes an listener. +func ICCCDeleteListenerReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete listener message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCDeleteListener{}, data) + + // Was it possible to convert the data? + if obj != nil { + + // Convert the object: + messageData := obj.(SystemMessages.ICCCDeleteListener) + + // The database selection: + selectionDelete := bson.D{{`Channel`, messageData.Channel}, {`Command`, messageData.Command}, {`IPAddressPort`, messageData.IPAddressPort}} + + // Delete the entry: + if errDelete := collectionListener.Remove(selectionDelete); errDelete != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC delete listener message.", errDelete.Error(), fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort)) + return Message2Data(``, ``, SystemMessages.AnswerNACK) + } else { + // + // Case: No error + // + + // Update the cache as soon as possible: + InitCacheNow() + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC listener was deleted.", fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort)) + return Message2Data(``, ``, SystemMessages.AnswerACK) + } + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCGetHostsReceiver.go b/ICCC/ICCCGetHostsReceiver.go new file mode 100644 index 0000000..cbaab11 --- /dev/null +++ b/ICCC/ICCCGetHostsReceiver.go @@ -0,0 +1,60 @@ +package ICCC + +import ( + "github.com/SommerEngineering/Ocean/ICCC/Scheme" + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" +) + +// The receiver function for the ICCC message, that yields the hosts. +func ICCCGetHostsReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC get hosts message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCGetHosts{}, data) + + // Was it possible to convert the data? + if obj != nil { + + // We have to read from the cache: + cacheHostDatabaseLock.RLock() + + // How many hosts we currently known? + countHosts := cacheHostDatabase.Len() + + // Prepare the answer object: + answerMessage := SystemMessages.ICCCGetHostsAnswer{} + answerMessage.Hostnames = make([]string, countHosts, countHosts) + answerMessage.IPAddressesPorts = make([]string, countHosts, countHosts) + + // Loop over all hosts which are currently available at the cache: + n := 0 + for entry := cacheHostDatabase.Front(); entry != nil; entry = entry.Next() { + host := entry.Value.(Scheme.Host) + answerMessage.Hostnames[n] = host.Hostname + answerMessage.IPAddressesPorts[n] = host.IPAddressPort + n++ + } + + // Unlock the cache: + cacheHostDatabaseLock.RUnlock() + + // Send the answer: + return Message2Data(``, ``, answerMessage) + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCGetListenersReceiver.go b/ICCC/ICCCGetListenersReceiver.go new file mode 100644 index 0000000..c93e618 --- /dev/null +++ b/ICCC/ICCCGetListenersReceiver.go @@ -0,0 +1,62 @@ +package ICCC + +import ( + "github.com/SommerEngineering/Ocean/ICCC/Scheme" + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" +) + +// The receiver function for the ICCC message, that yields the listeners. +func ICCCGetListenersReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC get listeners message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCGetListeners{}, data) + + // Was it possible to convert the data? + if obj != nil { + + // We have to read from the cache: + cacheListenerDatabaseLock.RLock() + + // How many listeners we currently known? + countListeners := cacheListenerDatabase.Len() + + // Prepare the answer object: + answerMessage := SystemMessages.ICCCGetListenersAnswer{} + answerMessage.Channels = make([]string, countListeners, countListeners) + answerMessage.Commands = make([]string, countListeners, countListeners) + answerMessage.IPAddressesPorts = make([]string, countListeners, countListeners) + + // Loop over all hosts which are currently available at the cache: + n := 0 + for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { + listener := entry.Value.(Scheme.Listener) + answerMessage.Channels[n] = listener.Channel + answerMessage.Commands[n] = listener.Command + answerMessage.IPAddressesPorts[n] = listener.IPAddressPort + n++ + } + + // Unlock the cache: + cacheListenerDatabaseLock.RUnlock() + + // Send the answer: + return Message2Data(``, ``, answerMessage) + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCListenerUpdateReceiver.go b/ICCC/ICCCListenerUpdateReceiver.go new file mode 100644 index 0000000..ce3fc89 --- /dev/null +++ b/ICCC/ICCCListenerUpdateReceiver.go @@ -0,0 +1,64 @@ +package ICCC + +import ( + "fmt" + "github.com/SommerEngineering/Ocean/ICCC/Scheme" + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" + "gopkg.in/mgo.v2/bson" +) + +// The receiver function for the ICCC message, that updates an listener. +func ICCCListenerUpdateReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC update listener message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCListenerUpdate{}, data) + + // Was it possible to convert the data? + if obj != nil { + + // Convert the object: + messageData := obj.(SystemMessages.ICCCListenerUpdate) + + // The database selection: + selectionUpdate := bson.D{{`Channel`, messageData.Channel}, {`Command`, messageData.Command}, {`IPAddressPort`, messageData.IPAddressPort}} + + // The object with holds the new state: + updatedObject := Scheme.Listener{} + updatedObject.Channel = messageData.Channel + updatedObject.Command = messageData.Command + updatedObject.IPAddressPort = messageData.IPAddressPort + updatedObject.IsActive = messageData.IsActiveNew + + // Update the entry: + if errUpdate := collectionListener.Update(selectionUpdate, updatedObject); errUpdate != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC update listener message.", errUpdate.Error(), fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort)) + return Message2Data(``, ``, SystemMessages.AnswerNACK) + } else { + // + // Case: No error + // + + // Update the cache as soon as possible: + InitCacheNow() + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNameSTATE, "An ICCC listener was updated.", fmt.Sprintf("channel='%s'", messageData.Channel), fmt.Sprintf("command='%s'", messageData.Command), fmt.Sprintf("ipAddressPort='%s'", messageData.IPAddressPort), fmt.Sprintf("isActive=%v", messageData.IsActiveNew)) + return Message2Data(``, ``, SystemMessages.AnswerACK) + } + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCPingReceiver.go b/ICCC/ICCCPingReceiver.go new file mode 100644 index 0000000..25ff9ab --- /dev/null +++ b/ICCC/ICCCPingReceiver.go @@ -0,0 +1,35 @@ +package ICCC + +import ( + "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" +) + +// The receiver function for the ICCC ping message. +func ICCCPingReceiver(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC ping message.") + result = make(map[string][]string, 0) + return + } + }() + + // Converts the HTTP form data into an object: + _, _, obj := Data2Message(SystemMessages.ICCCPing{}, data) + + // Was it possible to convert the data? + if obj != nil { + // An answer is necessary: + return Message2Data("", "", SystemMessages.AnswerACK) + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to convert the ping message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return +} diff --git a/ICCC/ICCCRegisterHostReceiver.go b/ICCC/ICCCRegisterHostReceiver.go index d4aeb1d..0a99c87 100644 --- a/ICCC/ICCCRegisterHostReceiver.go +++ b/ICCC/ICCCRegisterHostReceiver.go @@ -8,12 +8,12 @@ import ( ) // The receiver function for the ICCC message, that registers a host. -func ICCCRegisterHost(data map[string][]string) (result map[string][]string) { +func ICCCRegisterHostReceiver(data map[string][]string) (result map[string][]string) { // Recover from errors: defer func() { if err := recover(); err != nil { - Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC register host message. %s", err)) + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC register host message.") result = make(map[string][]string, 0) return } @@ -28,11 +28,14 @@ func ICCCRegisterHost(data map[string][]string) (result map[string][]string) { messageData := obj.(SystemMessages.ICCCRegisterHost) // Provide a log entry: - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another host.`, messageData.Hostname, messageData.IPAddressPort) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Register another host.`, fmt.Sprintf("hostname=%s", messageData.Hostname), fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort)) // Execute the command: registerHost2Database(messageData.Hostname, messageData.IPAddressPort) + // Update the caches: + InitCacheNow() + // An answer is necessary: return Message2Data(``, ``, SystemMessages.AnswerACK) } else { diff --git a/ICCC/ICCCRegisterCommandReceiver.go b/ICCC/ICCCRegisterListenerReceiver.go similarity index 65% rename from ICCC/ICCCRegisterCommandReceiver.go rename to ICCC/ICCCRegisterListenerReceiver.go index 65eedfa..fcc87ec 100644 --- a/ICCC/ICCCRegisterCommandReceiver.go +++ b/ICCC/ICCCRegisterListenerReceiver.go @@ -7,13 +7,13 @@ import ( LM "github.com/SommerEngineering/Ocean/Log/Meta" ) -// The receiver function for the ICCC message, that registers a command. -func ICCCRegisterCommand(data map[string][]string) (result map[string][]string) { +// The receiver function for the ICCC message, that registers a listener. +func ICCCRegisterListenerReceiver(data map[string][]string) (result map[string][]string) { // Recover from errors: defer func() { if err := recover(); err != nil { - Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC register command message. %s", err)) + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC register listener message.") result = make(map[string][]string, 0) return } @@ -28,10 +28,13 @@ func ICCCRegisterCommand(data map[string][]string) (result map[string][]string) messageData := obj.(SystemMessages.ICCCRegisterListener) // Provide a log entry: - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another command.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive)) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Should register another listener.`, `channel=`+messageData.Channel, `command=`+messageData.Command, `IPAddressPort=`+messageData.IPAddressPort, fmt.Sprintf(`isActive=%v`, messageData.IsActive)) // Execute the command: - registerCommand2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive) + registerListener2Database(messageData.Channel, messageData.Command, messageData.IPAddressPort, messageData.IsActive) + + // Update the caches: + InitCacheNow() // An answer is necessary: return Message2Data(``, ``, SystemMessages.AnswerACK) diff --git a/ICCC/Init.go b/ICCC/Init.go index 53578c4..2f7825d 100644 --- a/ICCC/Init.go +++ b/ICCC/Init.go @@ -12,8 +12,9 @@ func init() { Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Start init of ICCC.`) defer Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameINIT, `Done init ICCC.`) - // Create the list as cache for all global listener (not only listener from this server): + // Create the list as cache for all global listener and hosts (not only listener from this server): cacheListenerDatabase = list.New() + cacheHostDatabase = list.New() // Create a mapping as cache for all local listener end-points (functions): listeners = make(map[string]func(data map[string][]string) map[string][]string) diff --git a/ICCC/InitCacheNow.go b/ICCC/InitCacheNow.go index d643475..df94706 100644 --- a/ICCC/InitCacheNow.go +++ b/ICCC/InitCacheNow.go @@ -1,13 +1,6 @@ package ICCC -// Starts the timer cache once and exit it after (no thread, no endless loop). +// Starts the timer cache logic once and return after (no thread, no endless loop) func InitCacheNow() { - startCacheTimerLock.Lock() - defer startCacheTimerLock.Unlock() - - if cacheTimerRunning { - return - } - - cacheTimerLogic(false) + cacheTimerLogic() } diff --git a/ICCC/InitCacheTimer.go b/ICCC/InitCacheTimer.go index 1f3e9cc..6c9abe5 100644 --- a/ICCC/InitCacheTimer.go +++ b/ICCC/InitCacheTimer.go @@ -1,5 +1,9 @@ package ICCC +import ( + "time" +) + // Setup and starts the cache timer. func initCacheTimer() { startCacheTimerLock.Lock() @@ -11,11 +15,15 @@ func initCacheTimer() { cacheTimerRunning = true } - // Start another thread with the timer logic: + // Start another thread with the timer-logic: go func() { // Endless loop: for { - cacheTimerLogic(true) + // Execute the logic: + cacheTimerLogic() + + // Wait five minutes: + time.Sleep(time.Duration(5) * time.Minute) } }() } diff --git a/ICCC/InitDB.go b/ICCC/InitDB.go index 98dd9f6..f6c716c 100644 --- a/ICCC/InitDB.go +++ b/ICCC/InitDB.go @@ -52,15 +52,11 @@ func initDB() { // // Index for hosts: // - collectionHosts.EnsureIndexKey(`Hostname`, `IPAddressPort`) + collectionHosts.EnsureIndexKey(`Hostname`) + collectionHosts.EnsureIndexKey(`IPAddressPort`) indexName2 := mgo.Index{} - indexName2.Key = []string{`Hostname`} + indexName2.Key = []string{`Hostname`, `IPAddressPort`} indexName2.Unique = true collectionHosts.EnsureIndex(indexName2) - - indexName3 := mgo.Index{} - indexName3.Key = []string{`IPAddressPort`} - indexName3.Unique = true - collectionHosts.EnsureIndex(indexName3) } diff --git a/ICCC/RegisterCommand2Database.go b/ICCC/RegisterListener2Database.go similarity index 92% rename from ICCC/RegisterCommand2Database.go rename to ICCC/RegisterListener2Database.go index 15f3ed6..07e170a 100644 --- a/ICCC/RegisterCommand2Database.go +++ b/ICCC/RegisterListener2Database.go @@ -8,8 +8,8 @@ import ( "gopkg.in/mgo.v2/bson" ) -// The internal function to register a command to ICCC. -func registerCommand2Database(channel, command, ipAddressPort string, isActive bool) { +// The internal function to register an listener to ICCC. +func registerListener2Database(channel, command, ipAddressPort string, isActive bool) { Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `Register this ICCC command in to the database.`, `channel=`+channel, `command=`+command, `IPAddressPort=`+ipAddressPort, fmt.Sprintf("isActive=%v", isActive)) entry := Scheme.Listener{} diff --git a/ICCC/RegisterLocalCommand2Database.go b/ICCC/RegisterLocalCommand2Database.go deleted file mode 100644 index ea4472c..0000000 --- a/ICCC/RegisterLocalCommand2Database.go +++ /dev/null @@ -1,12 +0,0 @@ -package ICCC - -// The internal function to register a local command to ICCC. -func registerLocalCommand2Database(channel, command string) { - /* - Cannot use here the ICCC command to register this command. - Because, this host is maybe the first one. In that case, - there would be no server which can execute the ICCC command. - Therefore, every Ocean server registers the own commans directly. - */ - registerCommand2Database(channel, command, correctAddressWithPort, true) -} diff --git a/ICCC/RegisterLocalListener2Database.go b/ICCC/RegisterLocalListener2Database.go new file mode 100644 index 0000000..b91aa7c --- /dev/null +++ b/ICCC/RegisterLocalListener2Database.go @@ -0,0 +1,12 @@ +package ICCC + +// The internal function to register a local listener to ICCC. +func registerLocalListener2Database(channel, command string) { + /* + Cannot use here the ICCC command to register this listener. + Because, this host is maybe the first one. In that case, + there would be no server which can execute the ICCC command. + Therefore, every Ocean server registers the own listeners directly. + */ + registerListener2Database(channel, command, correctAddressWithPort, true) +} diff --git a/ICCC/Registrar.go b/ICCC/Registrar.go index c0b41cc..c1db7b5 100644 --- a/ICCC/Registrar.go +++ b/ICCC/Registrar.go @@ -12,7 +12,7 @@ func Registrar(channel, command string, callback func(data map[string][]string) defer listenersLock.Unlock() // Write the command to the database: - registerLocalCommand2Database(channel, command) + registerLocalListener2Database(channel, command) // Register the command at the local cache: listeners[fmt.Sprintf(`%s::%s`, channel, command)] = callback diff --git a/ICCC/Shutdown.go b/ICCC/Shutdown.go index 20b04d3..8696fab 100644 --- a/ICCC/Shutdown.go +++ b/ICCC/Shutdown.go @@ -35,5 +35,5 @@ func (a ShutdownFunction) Shutdown() { // Disconnect the database: db.Logout() dbSession.Close() - Log.LogShort(senderName, LM.CategoryAPP, LM.LevelWARN, LM.MessageNameSHUTDOWN, `Done shutting down now all ICCC listener for this host.`) + Log.LogShort(senderName, LM.CategoryAPP, LM.LevelWARN, LM.MessageNameSHUTDOWN, `Done shutting down all ICCC listener for this host.`) } diff --git a/ICCC/SystemMessages/ICCCDeleteHost.go b/ICCC/SystemMessages/ICCCDeleteHost.go new file mode 100644 index 0000000..f6742d3 --- /dev/null +++ b/ICCC/SystemMessages/ICCCDeleteHost.go @@ -0,0 +1,7 @@ +package SystemMessages + +// The message to delete a host from ICCC. +type ICCCDeleteHost struct { + Hostname string + IPAddressPort string +} diff --git a/ICCC/SystemMessages/ICCCDeleteListener.go b/ICCC/SystemMessages/ICCCDeleteListener.go new file mode 100644 index 0000000..e44f806 --- /dev/null +++ b/ICCC/SystemMessages/ICCCDeleteListener.go @@ -0,0 +1,8 @@ +package SystemMessages + +// The message to delete an listener from ICCC. +type ICCCDeleteListener struct { + Channel string + Command string + IPAddressPort string +} diff --git a/ICCC/SystemMessages/ICCCGetHosts.go b/ICCC/SystemMessages/ICCCGetHosts.go new file mode 100644 index 0000000..f3a696c --- /dev/null +++ b/ICCC/SystemMessages/ICCCGetHosts.go @@ -0,0 +1,11 @@ +package SystemMessages + +// A message to request the known ICCC hosts. +type ICCCGetHosts struct { +} + +// The answer to the hosts request. +type ICCCGetHostsAnswer struct { + Hostnames []string + IPAddressesPorts []string +} diff --git a/ICCC/SystemMessages/ICCCGetListeners.go b/ICCC/SystemMessages/ICCCGetListeners.go new file mode 100644 index 0000000..4a509e5 --- /dev/null +++ b/ICCC/SystemMessages/ICCCGetListeners.go @@ -0,0 +1,12 @@ +package SystemMessages + +// A message to request the known ICCC listeners. +type ICCCGetListeners struct { +} + +// The answer to the listeners request. +type ICCCGetListenersAnswer struct { + Channels []string + Commands []string + IPAddressesPorts []string +} diff --git a/ICCC/SystemMessages/ICCCListenerUpdate.go b/ICCC/SystemMessages/ICCCListenerUpdate.go new file mode 100644 index 0000000..df33b4f --- /dev/null +++ b/ICCC/SystemMessages/ICCCListenerUpdate.go @@ -0,0 +1,9 @@ +package SystemMessages + +// The message to update an listener from ICCC. +type ICCCListenerUpdate struct { + Channel string + Command string + IPAddressPort string + IsActiveNew bool +} diff --git a/ICCC/SystemMessages/ICCCPing.go b/ICCC/SystemMessages/ICCCPing.go new file mode 100644 index 0000000..b63d1fb --- /dev/null +++ b/ICCC/SystemMessages/ICCCPing.go @@ -0,0 +1,5 @@ +package SystemMessages + +// The ping message to check status of other hosts. +type ICCCPing struct { +} diff --git a/ICCC/SystemMessages/ICCCRegisterListener.go b/ICCC/SystemMessages/ICCCRegisterListener.go index 4f0378e..0f9e36f 100644 --- a/ICCC/SystemMessages/ICCCRegisterListener.go +++ b/ICCC/SystemMessages/ICCCRegisterListener.go @@ -1,6 +1,6 @@ package SystemMessages -// The message to register a command/listener to ICCC. +// The message to register an listener to ICCC. type ICCCRegisterListener struct { Channel string // The channel for the provided command Command string // The provided command diff --git a/ICCC/Variables.go b/ICCC/Variables.go index dc7a255..9a395fa 100644 --- a/ICCC/Variables.go +++ b/ICCC/Variables.go @@ -27,6 +27,8 @@ var ( listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache + cacheHostDatabase *list.List = nil // The cache for all hosts entries + cacheHostDatabaseLock sync.RWMutex = sync.RWMutex{} // The read-write mutex for the host cache startCacheTimerLock sync.Mutex = sync.Mutex{} // Mutex for the start timer cacheTimerRunning bool = false // Is the timer running? correctAddressWithPort string = `` // The IP address and port of the this local server diff --git a/NumGen/ICCCNextNumber.go b/NumGen/ICCCNextNumberReceiver.go similarity index 84% rename from NumGen/ICCCNextNumber.go rename to NumGen/ICCCNextNumberReceiver.go index 32ad4ee..e39e216 100644 --- a/NumGen/ICCCNextNumber.go +++ b/NumGen/ICCCNextNumberReceiver.go @@ -1,7 +1,6 @@ package NumGen import ( - "fmt" "github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/Log" @@ -9,12 +8,12 @@ import ( ) // The receiver function for the ICCC message, that registers a command. -func ICCCNextNumber(data map[string][]string) (result map[string][]string) { +func ICCCNextNumberReceiver(data map[string][]string) (result map[string][]string) { // Recover from errors: defer func() { if err := recover(); err != nil { - Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC next number command message. %s", err)) + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC next number command message.") result = make(map[string][]string, 0) return } diff --git a/System/ICCCComponentStartUpMessageReceiver.go b/System/ICCCComponentStartUpMessageReceiver.go index aae4ee3..0e7352b 100644 --- a/System/ICCCComponentStartUpMessageReceiver.go +++ b/System/ICCCComponentStartUpMessageReceiver.go @@ -9,12 +9,12 @@ import ( ) // The receiver function for the ICCC message, that a external component is up and running. -func icccComponentStartUpMessage(data map[string][]string) (result map[string][]string) { +func icccComponentStartUpMessageReceiver(data map[string][]string) (result map[string][]string) { // Recover from errors: defer func() { if err := recover(); err != nil { - Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC component startup message. %s", err)) + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC component startup message.") result = make(map[string][]string, 0) return } @@ -29,13 +29,12 @@ func icccComponentStartUpMessage(data map[string][]string) (result map[string][] messageData := obj.(SystemMessages.ICCCComponentStartUpMessage) // Provide a log entry: - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The external component is now up and ready.`, messageData.IPAddressPort) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The external component is now up and ready.`, fmt.Sprintf("ipAddressPort=%s", messageData.IPAddressPort)) // An answer is necessary: return ICCC.Message2Data("", "", SystemMessages.AnswerACK) } else { Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) - fmt.Println(`[Error] ICCC message: Was not able to create the message.`) } // In any other error case: diff --git a/System/ICCCOceanStartUpMessageReceiver.go b/System/ICCCOceanStartUpMessageReceiver.go index a7276a0..b95a95c 100644 --- a/System/ICCCOceanStartUpMessageReceiver.go +++ b/System/ICCCOceanStartUpMessageReceiver.go @@ -9,12 +9,12 @@ import ( ) // The receiver function for the ICCC message, that an Ocean server is up and running. -func icccOceanStartUpMessage(data map[string][]string) (result map[string][]string) { +func icccOceanStartUpMessageReceiver(data map[string][]string) (result map[string][]string) { // Recover from errors: defer func() { if err := recover(); err != nil { - Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC Ocean server startup message. %s", err)) + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, "Was not able to execute the ICCC Ocean server startup message.") result = make(map[string][]string, 0) return } @@ -29,13 +29,12 @@ func icccOceanStartUpMessage(data map[string][]string) (result map[string][]stri messageData := obj.(SystemMessages.ICCCOceanStartUpMessage) // Provide a log entry: - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The Ocean server is now up and ready.`, messageData.PublicIPAddressPort, messageData.AdminIPAddressPort) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The Ocean server is now up and ready.`, fmt.Sprintf("public server=%s", messageData.PublicIPAddressPort), fmt.Sprintf("admin server=%s", messageData.AdminIPAddressPort)) // An answer is necessary: return ICCC.Message2Data("", "", SystemMessages.AnswerACK) } else { Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) - fmt.Println(`[Error] ICCC message: Was not able to create the message.`) } // In any other error case: diff --git a/System/InitSystem.go b/System/InitSystem.go index 7970ea9..670160e 100644 --- a/System/InitSystem.go +++ b/System/InitSystem.go @@ -97,11 +97,17 @@ func initSystem() { // The logging subsystem is not registered here, because it will be automated called at the end // Register all system ICCC commands: - ICCC.Registrar(ICCC.ChannelSYSTEM, `System::OceanStart`, icccOceanStartUpMessage) - ICCC.Registrar(ICCC.ChannelSYSTEM, `System::ComponentStart`, icccComponentStartUpMessage) - ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterHost`, ICCC.ICCCRegisterHost) - ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterCommand`, ICCC.ICCCRegisterCommand) - ICCC.Registrar(ICCC.ChannelNUMGEN, `NumGen::Next`, NumGen.ICCCNextNumber) + ICCC.Registrar(ICCC.ChannelSTARTUP, `System::OceanStart`, icccOceanStartUpMessageReceiver) + ICCC.Registrar(ICCC.ChannelSTARTUP, `System::ComponentStart`, icccComponentStartUpMessageReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterHost`, ICCC.ICCCRegisterHostReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::RegisterListener`, ICCC.ICCCRegisterListenerReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::DeleteListener`, ICCC.ICCCDeleteListenerReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::DeleteHost`, ICCC.ICCCDeleteHostReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::ListenerUpdate`, ICCC.ICCCListenerUpdateReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::Ping`, ICCC.ICCCPingReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::GetHosts`, ICCC.ICCCGetHostsReceiver) + ICCC.Registrar(ICCC.ChannelICCC, `ICCC::GetListeners`, ICCC.ICCCGetListenersReceiver) + ICCC.Registrar(ICCC.ChannelNUMGEN, `NumGen::Next`, NumGen.ICCCNextNumberReceiver) // Start the ICCC Listener Cache: ICCC.InitCacheNow() // Blocking, until the job is done diff --git a/WebServer/Start.go b/WebServer/Start.go index c95737a..a09bf68 100644 --- a/WebServer/Start.go +++ b/WebServer/Start.go @@ -28,7 +28,7 @@ func Start() { } // Notify the whole cluster, that this server is now up and ready: - answers := ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data, SystemMessages.ICCCDefaultAnswer{}) + answers := ICCC.WriteMessage2All(ICCC.ChannelSTARTUP, `System::OceanStart`, data, SystemMessages.ICCCDefaultAnswer{}) for n, obj := range answers { if obj != nil { answer := obj.(SystemMessages.ICCCDefaultAnswer)