From 2bd612da48fbad742e234cac5d898cdfc2771aa9 Mon Sep 17 00:00:00 2001 From: Thorsten Sommer Date: Sun, 21 Jun 2015 20:18:23 +0200 Subject: [PATCH] Improved ICCC It is now necessary, that every command sends an answer. --- ICCC/Data2Message.go | 34 ++++++++++++++++++---- ICCC/HTTPConnector.go | 17 +++++++++-- ICCC/Init.go | 2 +- ICCC/Message2Data.go | 11 +++++++- ICCC/Registrar.go | 4 +-- ICCC/Send.go | 21 ++++++++++++-- ICCC/SystemMessages/DefaultAnswer.go | 7 +++++ ICCC/SystemMessages/Variables.go | 7 +++++ ICCC/Variables.go | 26 ++++++++--------- ICCC/WriteMessage2All.go | 42 +++++++++++++++++++++++----- ICCC/WriteMessage2Any.go | 19 +++++++++---- System/ICCCStart.go | 35 +++++++++++++++++++---- WebServer/Start.go | 9 +++++- 13 files changed, 188 insertions(+), 46 deletions(-) create mode 100644 ICCC/SystemMessages/DefaultAnswer.go create mode 100644 ICCC/SystemMessages/Variables.go diff --git a/ICCC/Data2Message.go b/ICCC/Data2Message.go index 56d3bbc..58b7785 100644 --- a/ICCC/Data2Message.go +++ b/ICCC/Data2Message.go @@ -2,12 +2,24 @@ package ICCC import ( "fmt" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" "reflect" "strconv" ) // Function to convert the HTTP data back to a message. func Data2Message(target interface{}, data map[string][]string) (channel, command string, obj interface{}) { + defer func() { + if err := recover(); err != nil { + channel = `` + command = `` + obj = nil + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to convert the HTTP values to a message. %s", err)) + return + } + }() + if data == nil || len(data) == 0 { channel = `` command = `` @@ -15,11 +27,23 @@ func Data2Message(target interface{}, data map[string][]string) (channel, comman return } - // Use reflection for the target type: - element := reflect.ValueOf(target) - element = element.Elem() - elementType := element.Type() + // By using reflection, determine the type: + elementType := reflect.TypeOf(target) + // Is it a pointer? + if elementType.Kind() == reflect.Ptr { + // Get the value behind the pointer: + elementType = elementType.Elem() + } + + // ICCC works with structs! If this is not a struct, its an error. + if elementType.Kind() != reflect.Struct { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityMiddle, LM.ImpactMiddle, LM.MessageNamePARSE, `Was not able to transform HTTP data to a message, because the given data was not a struct.`) + return + } + + // By using reflection, create a new instance: + element := reflect.New(elementType).Elem() channel = data[`channel`][0] command = data[`command`][0] @@ -124,6 +148,6 @@ func Data2Message(target interface{}, data map[string][]string) (channel, comman } } - obj = target + obj = element.Interface() return } diff --git a/ICCC/HTTPConnector.go b/ICCC/HTTPConnector.go index aa35b58..d40fde5 100644 --- a/ICCC/HTTPConnector.go +++ b/ICCC/HTTPConnector.go @@ -6,9 +6,11 @@ import ( LM "github.com/SommerEngineering/Ocean/Log/Meta" "github.com/SommerEngineering/Ocean/Tools" "net/http" + "net/url" ) -// The HTTP handler for ICCC. +// The HTTP handler for the local ICCC commands. Will used in case, that another server +// want to utelise an command from this server. func ICCCHandler(response http.ResponseWriter, request *http.Request) { // Cannot parse the form? @@ -49,8 +51,17 @@ func ICCCHandler(response http.ResponseWriter, request *http.Request) { if listener == nil { // Case: No such listener Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `Was not able to find the correct listener for these ICCC message.`, `channel=`+channel, `command`+command, `hostname=`+Tools.ThisHostname()) + http.NotFound(response, request) } else { - // Case: Everything is fine => deliver the message - listener(messageData) + // Case: Everything is fine => deliver the message and read the answer: + answersData := listener(messageData) + if answersData != nil { + // Convert the answer to HTTP form values: + values := url.Values(answersData) + answersString := values.Encode() + + // Write the answer to the other peer: + fmt.Fprintf(response, "%s", answersString) + } } } diff --git a/ICCC/Init.go b/ICCC/Init.go index e525be4..f7fee95 100644 --- a/ICCC/Init.go +++ b/ICCC/Init.go @@ -16,7 +16,7 @@ func init() { cacheListenerDatabase = list.New() // Create a mapping as cache for all local listener end-points (functions): - listeners = make(map[string]func(data map[string][]string)) + listeners = make(map[string]func(data map[string][]string) map[string][]string) // Using the local IP address: correctAddressWithPort = Tools.LocalIPAddressAndPort() diff --git a/ICCC/Message2Data.go b/ICCC/Message2Data.go index f0d4574..bb20921 100644 --- a/ICCC/Message2Data.go +++ b/ICCC/Message2Data.go @@ -2,12 +2,21 @@ package ICCC import ( "fmt" + "github.com/SommerEngineering/Ocean/Log" + LM "github.com/SommerEngineering/Ocean/Log/Meta" "reflect" "strconv" ) // Function to convert an ICCC message to HTTP data. -func message2Data(channel, command string, message interface{}) (data map[string][]string) { +func Message2Data(channel, command string, message interface{}) (data map[string][]string) { + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to convert the message to HTTP values. %s", err)) + data = make(map[string][]string, 0) + return + } + }() // Create the map: data = make(map[string][]string) diff --git a/ICCC/Registrar.go b/ICCC/Registrar.go index db3b929..4330235 100644 --- a/ICCC/Registrar.go +++ b/ICCC/Registrar.go @@ -6,8 +6,8 @@ import ( LM "github.com/SommerEngineering/Ocean/Log/Meta" ) -// Register a command to ICCC for a specific channel. -func Registrar(channel, command string, callback func(data map[string][]string)) { +// Register an local command to ICCC for a specific channel. +func Registrar(channel, command string, callback func(data map[string][]string) map[string][]string) { listenersLock.Lock() defer listenersLock.Unlock() diff --git a/ICCC/Send.go b/ICCC/Send.go index 98fa52d..dc41a50 100644 --- a/ICCC/Send.go +++ b/ICCC/Send.go @@ -5,12 +5,13 @@ import ( "github.com/SommerEngineering/Ocean/Log" LM "github.com/SommerEngineering/Ocean/Log/Meta" "github.com/SommerEngineering/Ocean/Tools" + "io/ioutil" "net/http" "net/url" ) // Send a message to a listener. -func sendMessage(listener Scheme.Listener, data map[string][]string) { +func sendMessage(listener Scheme.Listener, data map[string][]string) (result map[string][]string) { // Convert the data and encode it: valuesHTTP := url.Values(data) @@ -18,9 +19,25 @@ func sendMessage(listener Scheme.Listener, data map[string][]string) { valuesHTTP.Add(`InternalCommPassword`, Tools.InternalCommPassword()) // Try to deliver the message: - if _, err := http.PostForm(`http://`+listener.IPAddressPort+`/ICCC`, valuesHTTP); err != nil { + if response, err := http.PostForm(`http://`+listener.IPAddressPort+`/ICCC`, valuesHTTP); err != nil { // Case: Was not possible to deliver. Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to send the ICCC message.`, err.Error()) + } else { + // Case: Delivery was fine. + defer response.Body.Close() + if responseData, err := ioutil.ReadAll(response.Body); err != nil { + // Case: Was not possible to read the answer. + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to read the ICCC answer.`, err.Error()) + } else { + // Case: Was able to read the answer. + if dataObj, errObj := url.ParseQuery(string(responseData)); errObj != nil { + // Case: Was not able to parse the answer to values. + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to parse the answer to values.`, errObj.Error()) + } else { + // Case: Everything was fine. + result = map[string][]string(dataObj) + } + } } return diff --git a/ICCC/SystemMessages/DefaultAnswer.go b/ICCC/SystemMessages/DefaultAnswer.go new file mode 100644 index 0000000..63f4957 --- /dev/null +++ b/ICCC/SystemMessages/DefaultAnswer.go @@ -0,0 +1,7 @@ +package SystemMessages + +// The type for any answer, which can be extended by using CommandData. +type DefaultAnswer struct { + CommandSuccessful bool + CommandAnswer int64 +} diff --git a/ICCC/SystemMessages/Variables.go b/ICCC/SystemMessages/Variables.go new file mode 100644 index 0000000..0b2c05c --- /dev/null +++ b/ICCC/SystemMessages/Variables.go @@ -0,0 +1,7 @@ +package SystemMessages + +var ( + AnswerACK DefaultAnswer = DefaultAnswer{true, 0} // The command was successful + AnswerNACK DefaultAnswer = DefaultAnswer{false, 0} // The command was not successful + AnswerUNKNOWN DefaultAnswer = DefaultAnswer{false, -1} // The answer is unknown e.g. an error while reading the answer (HTTP errors, etc.) +) diff --git a/ICCC/Variables.go b/ICCC/Variables.go index c9a4693..dc7a255 100644 --- a/ICCC/Variables.go +++ b/ICCC/Variables.go @@ -17,17 +17,17 @@ const ( ) var ( - senderName LM.Sender = `ICCC` // This is the name for logging event from this package - db *mgo.Database = nil // The database - dbSession *mgo.Session = nil // The database session - collectionListener *mgo.Collection = nil // The database collection for listeners - collectionHosts *mgo.Collection = nil // The database collection for hosts - reservedSystemChannels []string = []string{ChannelSYSTEM, ChannelNUMGEN, ChannelSHUTDOWN, ChannelSTARTUP, ChannelICCC} // The reserved and pre-defined system channels - listeners map[string]func(data map[string][]string) = nil // The listener cache for all local available listeners with local functions - listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache - cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers - cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache - startCacheTimerLock sync.Mutex = sync.Mutex{} // Mutex for the start timer - cacheTimerRunning bool = false // Is the timer running? - correctAddressWithPort string = `` // The IP address and port of the this local server + senderName LM.Sender = `ICCC` // This is the name for logging event from this package + db *mgo.Database = nil // The database + dbSession *mgo.Session = nil // The database session + collectionListener *mgo.Collection = nil // The database collection for listeners + collectionHosts *mgo.Collection = nil // The database collection for hosts + reservedSystemChannels []string = []string{ChannelSYSTEM, ChannelNUMGEN, ChannelSHUTDOWN, ChannelSTARTUP, ChannelICCC} // The reserved and pre-defined system channels + listeners map[string]func(data map[string][]string) map[string][]string = nil // The listener cache for all local available listeners with local functions + listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache + cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers + cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache + startCacheTimerLock sync.Mutex = sync.Mutex{} // Mutex for the start timer + cacheTimerRunning bool = false // Is the timer running? + correctAddressWithPort string = `` // The IP address and port of the this local server ) diff --git a/ICCC/WriteMessage2All.go b/ICCC/WriteMessage2All.go index 063be44..c92733a 100644 --- a/ICCC/WriteMessage2All.go +++ b/ICCC/WriteMessage2All.go @@ -7,13 +7,15 @@ import ( ) // Function to broadcast a message to all listeners. -func WriteMessage2All(channel, command string, message interface{}) { +func WriteMessage2All(channel, command string, message interface{}, answerPrototype interface{}) (results []interface{}) { cacheListenerDatabaseLock.RLock() defer cacheListenerDatabaseLock.RUnlock() // Convert the message to HTTP data: - data := message2Data(channel, command, message) - counter := 0 + data := Message2Data(channel, command, message) + + // Store all matching listener: + matchingListener := make([]Scheme.Listener, 0) // Loop over all listeners which are currently available at the cache: for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { @@ -21,14 +23,40 @@ func WriteMessage2All(channel, command string, message interface{}) { // If the channel and the command matches, deliver the message: if listener.Channel == channel && listener.Command == command { - go sendMessage(listener, data) - counter++ + matchingListener = append(matchingListener, listener) } } - // Was not able to deliver to any listener? - if counter == 0 { + // Was not able to find any matching listener? + if len(matchingListener) == 0 { Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message, because no listener was found!`, `channel=`+channel, `command=`+command) + return + } + + // Create an channel to receive all the answers from all listeners: + answerChannel := make(chan map[string][]string, len(matchingListener)) + + // Start for every listener an own thread: + for _, listener := range matchingListener { + go func() { + answerChannel <- sendMessage(listener, data) + }() + } + + // Reserve memory for the result data: + results = make([]interface{}, len(matchingListener)) + + // Read all answers: + for n := 0; n < len(matchingListener); n++ { + // + // We use no timeout here. This way, it is also possible to execute + // long running commands by ICCC. The caller can abort, if necessary. + // + answersData := <-answerChannel + + // Convert the data to the message type. The call will use answerPrototype + // just as a prototype and will create a new instance for every call: + _, _, results[n] = Data2Message(answerPrototype, answersData) } return diff --git a/ICCC/WriteMessage2Any.go b/ICCC/WriteMessage2Any.go index ec72a3d..5b7aa82 100644 --- a/ICCC/WriteMessage2Any.go +++ b/ICCC/WriteMessage2Any.go @@ -8,12 +8,12 @@ import ( ) // Function to write a message to any listener. -func WriteMessage2Any(channel, command string, message interface{}) { +func WriteMessage2Any(channel, command string, message interface{}, answerPrototype interface{}) (result interface{}) { cacheListenerDatabaseLock.RLock() defer cacheListenerDatabaseLock.RUnlock() // Convert the message to HTTP data: - data := message2Data(channel, command, message) + data := Message2Data(channel, command, message) maxCount := cacheListenerDatabase.Len() entries := make([]Scheme.Listener, 0, maxCount) counter := 0 @@ -31,16 +31,25 @@ func WriteMessage2Any(channel, command string, message interface{}) { count := len(entries) if count > 0 { - // Case: Find at least one possible listener. Choose a random one and deliver: + // + // Case: Find at least one possible listener. + // + + // Case: There is only 1 listener if len(entries) == 1 { listener := entries[0] - go sendMessage(listener, data) + answersData := sendMessage(listener, data) + _, _, result = Data2Message(answerPrototype, answersData) } else { + // Case: Multiple listeners are available. Choose a random one and deliver: listener := entries[Tools.RandomInteger(count)] - go sendMessage(listener, data) + answersData := sendMessage(listener, data) + _, _, result = Data2Message(answerPrototype, answersData) } } else { // Case: Find no listener at all. Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message to any listener, because no listener was found!`, `channel=`+channel, `command=`+command) } + + return } diff --git a/System/ICCCStart.go b/System/ICCCStart.go index 3df5d9d..9b82cde 100644 --- a/System/ICCCStart.go +++ b/System/ICCCStart.go @@ -1,6 +1,7 @@ package System import ( + "fmt" "github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/Log" @@ -8,14 +9,36 @@ import ( ) // The receiver function for the ICCC message, that a server is up and running. -func icccSystemStart(data map[string][]string) { +func icccSystemStart(data map[string][]string) (result map[string][]string) { + + // Recover from errors: + defer func() { + if err := recover(); err != nil { + Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC server startup message. %s", err)) + result = make(map[string][]string, 0) + return + } + }() // Converts the HTTP form data into an object: - _, _, obj := ICCC.Data2Message(&SystemMessages.ICCCStartUpMessage{}, data) + _, _, obj := ICCC.Data2Message(SystemMessages.ICCCStartUpMessage{}, data) - // Cast the object to the right type: - messageData := obj.(*SystemMessages.ICCCStartUpMessage) + // Was it possible to convert the data? + if obj != nil { + // Cast the object to the right type: + messageData := obj.(SystemMessages.ICCCStartUpMessage) - // Provide a log entry: - Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The server is now up and ready.`, messageData.PublicIPAddressAndPort, messageData.AdminIPAddressAndPort) + // Provide a log entry: + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The server is now up and ready.`, messageData.PublicIPAddressAndPort, messageData.AdminIPAddressAndPort) + + // An answer is necessary: + return ICCC.Message2Data("", "", SystemMessages.AnswerACK) + } else { + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`) + fmt.Println(`[Error] ICCC message: Was not able to create the message.`) + } + + // In any other error case: + result = make(map[string][]string, 0) + return } diff --git a/WebServer/Start.go b/WebServer/Start.go index 374ef9c..b09547f 100644 --- a/WebServer/Start.go +++ b/WebServer/Start.go @@ -1,6 +1,7 @@ package WebServer import ( + "fmt" "github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/Log" @@ -27,5 +28,11 @@ func Start() { } // Notify the whole cluster, that this server is now up and ready: - ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data) + answers := ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data, SystemMessages.DefaultAnswer{}) + for n, obj := range answers { + if obj != nil { + answer := obj.(SystemMessages.DefaultAnswer) + Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, fmt.Sprintf("An answer to the ICCC start up message: Successful=%v, Status=%d, Answer=%d/%d", answer.CommandSuccessful, answer.CommandAnswer, n+1, len(answers))) + } + } }