Improved ICCC

It is now necessary, that every command sends an answer.
This commit is contained in:
Thorsten Sommer 2015-06-21 20:18:23 +02:00
parent dcc1af2a23
commit 2bd612da48
13 changed files with 188 additions and 46 deletions

View File

@ -2,12 +2,24 @@ package ICCC
import ( import (
"fmt" "fmt"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
"reflect" "reflect"
"strconv" "strconv"
) )
// Function to convert the HTTP data back to a message. // Function to convert the HTTP data back to a message.
func Data2Message(target interface{}, data map[string][]string) (channel, command string, obj interface{}) { func Data2Message(target interface{}, data map[string][]string) (channel, command string, obj interface{}) {
defer func() {
if err := recover(); err != nil {
channel = ``
command = ``
obj = nil
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to convert the HTTP values to a message. %s", err))
return
}
}()
if data == nil || len(data) == 0 { if data == nil || len(data) == 0 {
channel = `` channel = ``
command = `` command = ``
@ -15,11 +27,23 @@ func Data2Message(target interface{}, data map[string][]string) (channel, comman
return return
} }
// Use reflection for the target type: // By using reflection, determine the type:
element := reflect.ValueOf(target) elementType := reflect.TypeOf(target)
element = element.Elem()
elementType := element.Type()
// Is it a pointer?
if elementType.Kind() == reflect.Ptr {
// Get the value behind the pointer:
elementType = elementType.Elem()
}
// ICCC works with structs! If this is not a struct, its an error.
if elementType.Kind() != reflect.Struct {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityMiddle, LM.ImpactMiddle, LM.MessageNamePARSE, `Was not able to transform HTTP data to a message, because the given data was not a struct.`)
return
}
// By using reflection, create a new instance:
element := reflect.New(elementType).Elem()
channel = data[`channel`][0] channel = data[`channel`][0]
command = data[`command`][0] command = data[`command`][0]
@ -124,6 +148,6 @@ func Data2Message(target interface{}, data map[string][]string) (channel, comman
} }
} }
obj = target obj = element.Interface()
return return
} }

View File

@ -6,9 +6,11 @@ import (
LM "github.com/SommerEngineering/Ocean/Log/Meta" LM "github.com/SommerEngineering/Ocean/Log/Meta"
"github.com/SommerEngineering/Ocean/Tools" "github.com/SommerEngineering/Ocean/Tools"
"net/http" "net/http"
"net/url"
) )
// The HTTP handler for ICCC. // The HTTP handler for the local ICCC commands. Will used in case, that another server
// want to utelise an command from this server.
func ICCCHandler(response http.ResponseWriter, request *http.Request) { func ICCCHandler(response http.ResponseWriter, request *http.Request) {
// Cannot parse the form? // Cannot parse the form?
@ -49,8 +51,17 @@ func ICCCHandler(response http.ResponseWriter, request *http.Request) {
if listener == nil { if listener == nil {
// Case: No such listener // Case: No such listener
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `Was not able to find the correct listener for these ICCC message.`, `channel=`+channel, `command`+command, `hostname=`+Tools.ThisHostname()) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `Was not able to find the correct listener for these ICCC message.`, `channel=`+channel, `command`+command, `hostname=`+Tools.ThisHostname())
http.NotFound(response, request)
} else { } else {
// Case: Everything is fine => deliver the message // Case: Everything is fine => deliver the message and read the answer:
listener(messageData) answersData := listener(messageData)
if answersData != nil {
// Convert the answer to HTTP form values:
values := url.Values(answersData)
answersString := values.Encode()
// Write the answer to the other peer:
fmt.Fprintf(response, "%s", answersString)
}
} }
} }

View File

@ -16,7 +16,7 @@ func init() {
cacheListenerDatabase = list.New() cacheListenerDatabase = list.New()
// Create a mapping as cache for all local listener end-points (functions): // Create a mapping as cache for all local listener end-points (functions):
listeners = make(map[string]func(data map[string][]string)) listeners = make(map[string]func(data map[string][]string) map[string][]string)
// Using the local IP address: // Using the local IP address:
correctAddressWithPort = Tools.LocalIPAddressAndPort() correctAddressWithPort = Tools.LocalIPAddressAndPort()

View File

@ -2,12 +2,21 @@ package ICCC
import ( import (
"fmt" "fmt"
"github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta"
"reflect" "reflect"
"strconv" "strconv"
) )
// Function to convert an ICCC message to HTTP data. // Function to convert an ICCC message to HTTP data.
func message2Data(channel, command string, message interface{}) (data map[string][]string) { func Message2Data(channel, command string, message interface{}) (data map[string][]string) {
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to convert the message to HTTP values. %s", err))
data = make(map[string][]string, 0)
return
}
}()
// Create the map: // Create the map:
data = make(map[string][]string) data = make(map[string][]string)

View File

@ -6,8 +6,8 @@ import (
LM "github.com/SommerEngineering/Ocean/Log/Meta" LM "github.com/SommerEngineering/Ocean/Log/Meta"
) )
// Register a command to ICCC for a specific channel. // Register an local command to ICCC for a specific channel.
func Registrar(channel, command string, callback func(data map[string][]string)) { func Registrar(channel, command string, callback func(data map[string][]string) map[string][]string) {
listenersLock.Lock() listenersLock.Lock()
defer listenersLock.Unlock() defer listenersLock.Unlock()

View File

@ -5,12 +5,13 @@ import (
"github.com/SommerEngineering/Ocean/Log" "github.com/SommerEngineering/Ocean/Log"
LM "github.com/SommerEngineering/Ocean/Log/Meta" LM "github.com/SommerEngineering/Ocean/Log/Meta"
"github.com/SommerEngineering/Ocean/Tools" "github.com/SommerEngineering/Ocean/Tools"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
) )
// Send a message to a listener. // Send a message to a listener.
func sendMessage(listener Scheme.Listener, data map[string][]string) { func sendMessage(listener Scheme.Listener, data map[string][]string) (result map[string][]string) {
// Convert the data and encode it: // Convert the data and encode it:
valuesHTTP := url.Values(data) valuesHTTP := url.Values(data)
@ -18,9 +19,25 @@ func sendMessage(listener Scheme.Listener, data map[string][]string) {
valuesHTTP.Add(`InternalCommPassword`, Tools.InternalCommPassword()) valuesHTTP.Add(`InternalCommPassword`, Tools.InternalCommPassword())
// Try to deliver the message: // Try to deliver the message:
if _, err := http.PostForm(`http://`+listener.IPAddressPort+`/ICCC`, valuesHTTP); err != nil { if response, err := http.PostForm(`http://`+listener.IPAddressPort+`/ICCC`, valuesHTTP); err != nil {
// Case: Was not possible to deliver. // Case: Was not possible to deliver.
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to send the ICCC message.`, err.Error()) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to send the ICCC message.`, err.Error())
} else {
// Case: Delivery was fine.
defer response.Body.Close()
if responseData, err := ioutil.ReadAll(response.Body); err != nil {
// Case: Was not possible to read the answer.
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to read the ICCC answer.`, err.Error())
} else {
// Case: Was able to read the answer.
if dataObj, errObj := url.ParseQuery(string(responseData)); errObj != nil {
// Case: Was not able to parse the answer to values.
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameNETWORK, `Was not able to parse the answer to values.`, errObj.Error())
} else {
// Case: Everything was fine.
result = map[string][]string(dataObj)
}
}
} }
return return

View File

@ -0,0 +1,7 @@
package SystemMessages
// The type for any answer, which can be extended by using CommandData.
type DefaultAnswer struct {
CommandSuccessful bool
CommandAnswer int64
}

View File

@ -0,0 +1,7 @@
package SystemMessages
var (
AnswerACK DefaultAnswer = DefaultAnswer{true, 0} // The command was successful
AnswerNACK DefaultAnswer = DefaultAnswer{false, 0} // The command was not successful
AnswerUNKNOWN DefaultAnswer = DefaultAnswer{false, -1} // The answer is unknown e.g. an error while reading the answer (HTTP errors, etc.)
)

View File

@ -23,7 +23,7 @@ var (
collectionListener *mgo.Collection = nil // The database collection for listeners collectionListener *mgo.Collection = nil // The database collection for listeners
collectionHosts *mgo.Collection = nil // The database collection for hosts collectionHosts *mgo.Collection = nil // The database collection for hosts
reservedSystemChannels []string = []string{ChannelSYSTEM, ChannelNUMGEN, ChannelSHUTDOWN, ChannelSTARTUP, ChannelICCC} // The reserved and pre-defined system channels reservedSystemChannels []string = []string{ChannelSYSTEM, ChannelNUMGEN, ChannelSHUTDOWN, ChannelSTARTUP, ChannelICCC} // The reserved and pre-defined system channels
listeners map[string]func(data map[string][]string) = nil // The listener cache for all local available listeners with local functions listeners map[string]func(data map[string][]string) map[string][]string = nil // The listener cache for all local available listeners with local functions
listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache listenersLock sync.RWMutex = sync.RWMutex{} // The mutex for the listener cache
cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers cacheListenerDatabase *list.List = nil // The globally cache for all listeners from all servers
cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache cacheListenerDatabaseLock sync.RWMutex = sync.RWMutex{} // The mutex for the globally cache

View File

@ -7,13 +7,15 @@ import (
) )
// Function to broadcast a message to all listeners. // Function to broadcast a message to all listeners.
func WriteMessage2All(channel, command string, message interface{}) { func WriteMessage2All(channel, command string, message interface{}, answerPrototype interface{}) (results []interface{}) {
cacheListenerDatabaseLock.RLock() cacheListenerDatabaseLock.RLock()
defer cacheListenerDatabaseLock.RUnlock() defer cacheListenerDatabaseLock.RUnlock()
// Convert the message to HTTP data: // Convert the message to HTTP data:
data := message2Data(channel, command, message) data := Message2Data(channel, command, message)
counter := 0
// Store all matching listener:
matchingListener := make([]Scheme.Listener, 0)
// Loop over all listeners which are currently available at the cache: // Loop over all listeners which are currently available at the cache:
for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() { for entry := cacheListenerDatabase.Front(); entry != nil; entry = entry.Next() {
@ -21,14 +23,40 @@ func WriteMessage2All(channel, command string, message interface{}) {
// If the channel and the command matches, deliver the message: // If the channel and the command matches, deliver the message:
if listener.Channel == channel && listener.Command == command { if listener.Channel == channel && listener.Command == command {
go sendMessage(listener, data) matchingListener = append(matchingListener, listener)
counter++
} }
} }
// Was not able to deliver to any listener? // Was not able to find any matching listener?
if counter == 0 { if len(matchingListener) == 0 {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message, because no listener was found!`, `channel=`+channel, `command=`+command) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message, because no listener was found!`, `channel=`+channel, `command=`+command)
return
}
// Create an channel to receive all the answers from all listeners:
answerChannel := make(chan map[string][]string, len(matchingListener))
// Start for every listener an own thread:
for _, listener := range matchingListener {
go func() {
answerChannel <- sendMessage(listener, data)
}()
}
// Reserve memory for the result data:
results = make([]interface{}, len(matchingListener))
// Read all answers:
for n := 0; n < len(matchingListener); n++ {
//
// We use no timeout here. This way, it is also possible to execute
// long running commands by ICCC. The caller can abort, if necessary.
//
answersData := <-answerChannel
// Convert the data to the message type. The call will use answerPrototype
// just as a prototype and will create a new instance for every call:
_, _, results[n] = Data2Message(answerPrototype, answersData)
} }
return return

View File

@ -8,12 +8,12 @@ import (
) )
// Function to write a message to any listener. // Function to write a message to any listener.
func WriteMessage2Any(channel, command string, message interface{}) { func WriteMessage2Any(channel, command string, message interface{}, answerPrototype interface{}) (result interface{}) {
cacheListenerDatabaseLock.RLock() cacheListenerDatabaseLock.RLock()
defer cacheListenerDatabaseLock.RUnlock() defer cacheListenerDatabaseLock.RUnlock()
// Convert the message to HTTP data: // Convert the message to HTTP data:
data := message2Data(channel, command, message) data := Message2Data(channel, command, message)
maxCount := cacheListenerDatabase.Len() maxCount := cacheListenerDatabase.Len()
entries := make([]Scheme.Listener, 0, maxCount) entries := make([]Scheme.Listener, 0, maxCount)
counter := 0 counter := 0
@ -31,16 +31,25 @@ func WriteMessage2Any(channel, command string, message interface{}) {
count := len(entries) count := len(entries)
if count > 0 { if count > 0 {
// Case: Find at least one possible listener. Choose a random one and deliver: //
// Case: Find at least one possible listener.
//
// Case: There is only 1 listener
if len(entries) == 1 { if len(entries) == 1 {
listener := entries[0] listener := entries[0]
go sendMessage(listener, data) answersData := sendMessage(listener, data)
_, _, result = Data2Message(answerPrototype, answersData)
} else { } else {
// Case: Multiple listeners are available. Choose a random one and deliver:
listener := entries[Tools.RandomInteger(count)] listener := entries[Tools.RandomInteger(count)]
go sendMessage(listener, data) answersData := sendMessage(listener, data)
_, _, result = Data2Message(answerPrototype, answersData)
} }
} else { } else {
// Case: Find no listener at all. // Case: Find no listener at all.
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message to any listener, because no listener was found!`, `channel=`+channel, `command=`+command) Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelWARN, LM.SeverityCritical, LM.ImpactUnknown, LM.MessageNameCONFIGURATION, `It was not able to deliver this message to any listener, because no listener was found!`, `channel=`+channel, `command=`+command)
} }
return
} }

View File

@ -1,6 +1,7 @@
package System package System
import ( import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log" "github.com/SommerEngineering/Ocean/Log"
@ -8,14 +9,36 @@ import (
) )
// The receiver function for the ICCC message, that a server is up and running. // The receiver function for the ICCC message, that a server is up and running.
func icccSystemStart(data map[string][]string) { func icccSystemStart(data map[string][]string) (result map[string][]string) {
// Recover from errors:
defer func() {
if err := recover(); err != nil {
Log.LogFull(senderName, LM.CategorySYSTEM, LM.LevelERROR, LM.SeverityUnknown, LM.ImpactUnknown, LM.MessageNamePARSE, fmt.Sprintf("Was not able to execute the ICCC server startup message. %s", err))
result = make(map[string][]string, 0)
return
}
}()
// Converts the HTTP form data into an object: // Converts the HTTP form data into an object:
_, _, obj := ICCC.Data2Message(&SystemMessages.ICCCStartUpMessage{}, data) _, _, obj := ICCC.Data2Message(SystemMessages.ICCCStartUpMessage{}, data)
// Was it possible to convert the data?
if obj != nil {
// Cast the object to the right type: // Cast the object to the right type:
messageData := obj.(*SystemMessages.ICCCStartUpMessage) messageData := obj.(SystemMessages.ICCCStartUpMessage)
// Provide a log entry: // Provide a log entry:
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The server is now up and ready.`, messageData.PublicIPAddressAndPort, messageData.AdminIPAddressAndPort) Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: The server is now up and ready.`, messageData.PublicIPAddressAndPort, messageData.AdminIPAddressAndPort)
// An answer is necessary:
return ICCC.Message2Data("", "", SystemMessages.AnswerACK)
} else {
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, `ICCC message: Was not able to create the message.`)
fmt.Println(`[Error] ICCC message: Was not able to create the message.`)
}
// In any other error case:
result = make(map[string][]string, 0)
return
} }

View File

@ -1,6 +1,7 @@
package WebServer package WebServer
import ( import (
"fmt"
"github.com/SommerEngineering/Ocean/ICCC" "github.com/SommerEngineering/Ocean/ICCC"
"github.com/SommerEngineering/Ocean/ICCC/SystemMessages" "github.com/SommerEngineering/Ocean/ICCC/SystemMessages"
"github.com/SommerEngineering/Ocean/Log" "github.com/SommerEngineering/Ocean/Log"
@ -27,5 +28,11 @@ func Start() {
} }
// Notify the whole cluster, that this server is now up and ready: // Notify the whole cluster, that this server is now up and ready:
ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data) answers := ICCC.WriteMessage2All(ICCC.ChannelSYSTEM, `System::Start`, data, SystemMessages.DefaultAnswer{})
for n, obj := range answers {
if obj != nil {
answer := obj.(SystemMessages.DefaultAnswer)
Log.LogShort(senderName, LM.CategorySYSTEM, LM.LevelINFO, LM.MessageNameSTARTUP, fmt.Sprintf("An answer to the ICCC start up message: Successful=%v, Status=%d, Answer=%d/%d", answer.CommandSuccessful, answer.CommandAnswer, n+1, len(answers)))
}
}
} }