From 0f03f24b5eae329c31d70da7852b400ebea48f6e Mon Sep 17 00:00:00 2001 From: Ethereal Date: Mon, 21 Feb 2011 15:02:25 -0700 Subject: [PATCH] Decision time again, as per usual . . . There are two possibilities that have been previously discussed for the SHM- based communication system. One is to use a slightly "gappy" system, and to do all reads and writes from/to the actual SHM itself; the other is to prepare the packet data beforehand, and then copy the entire packet over to the SHM. The first approach wastes a small amount of memory in the SHM (reducing the buffer size by a small amount) but greatly saves on CPU time. The second is far easier to implement and saves a small amount of memory, but takes far more CPU time. The question is, which one? --- TODO | 1 - include/monitor/SHMReader.h | 4 ++- include/monitor/ZoneReader.h | 30 +++++++++++++++++ include/shm/ZoneHeader.h | 13 ++------ include/storage/RTree.h | 6 ++-- modules/informer/src/collector/Informer.c | 54 +++++++++++++++++++++++-------- src/monitor/Aesalon.cpp | 2 +- src/monitor/ArgumentParser.cpp | 4 +-- src/monitor/Launcher.cpp | 2 ++ src/monitor/SHMReader.cpp | 29 ++++++++++++++--- src/monitor/ZoneReader.cpp | 53 ++++++++++++++++++++++++++++++ 11 files changed, 162 insertions(+), 36 deletions(-) create mode 100644 include/monitor/ZoneReader.h create mode 100644 src/monitor/ZoneReader.cpp diff --git a/TODO b/TODO index d9c53bf..13ed2a2 100644 --- a/TODO +++ b/TODO @@ -2,7 +2,6 @@ - Implement. ==== Monitor ==== -- Implement a unified messaging system. ==== Marshaller ==== - Allow access to the marshaller list for inter-module communication via "setup" function, to save function pointers etc. diff --git a/include/monitor/SHMReader.h b/include/monitor/SHMReader.h index fa4dd74..7405333 100644 --- a/include/monitor/SHMReader.h +++ b/include/monitor/SHMReader.h @@ -35,9 +35,11 @@ public: ~SHMReader(); uint32_t zoneCount(); - uint8_t *zoneWithData(); + int32_t zoneWithData(); void waitForPacket(); + + void readData(uint32_t zoneID, void *buffer, uint32_t size); private: uint8_t *getZone(uint32_t id); void *mapRegion(uint32_t start, uint32_t size); diff --git a/include/monitor/ZoneReader.h b/include/monitor/ZoneReader.h new file mode 100644 index 0000000..9977ad3 --- /dev/null +++ b/include/monitor/ZoneReader.h @@ -0,0 +1,30 @@ +/** Aesalon, a tool to visualize program behaviour in real time. + Copyright (C) 2009-2011, Aesalon development team. + + Aesalon is distributed under the terms of the GNU GPLv3. See + the included file LICENSE for more information. + + @file include/monitor/ZoneReader.h +*/ + +#ifndef AesalonMonitor_ZoneReader_H +#define AesalonMonitor_ZoneReader_H + +#include "monitor/SHMReader.h" + +namespace Monitor { + +class ZoneReader { +private: + SHMReader *m_shmReader; +public: + ZoneReader(SHMReader *shmReader); + ~ZoneReader(); + +private: + static void *run(void *voidInstance); +}; + +} // namespace Monitor + +#endif diff --git a/include/shm/ZoneHeader.h b/include/shm/ZoneHeader.h index 724bec7..23c188f 100644 --- a/include/shm/ZoneHeader.h +++ b/include/shm/ZoneHeader.h @@ -15,9 +15,10 @@ #ifdef __cplusplus namespace SHM { - +#define ZoneDataOffset (sizeof(SHM::ZoneHeader)+16) struct ZoneHeader { #else +#define ZoneDataOffset (sizeof(ZoneHeader)+16) typedef struct ZoneHeader ZoneHeader; struct ZoneHeader { #endif @@ -37,22 +38,12 @@ struct ZoneHeader { /** The overflow semaphore, see @a overflow for description. */ sem_t overflowSemaphore; - /** The size of the end-gap. This is used when a size larger than the remaining space on the end of the - circular buffer is required. - - A zero value indicates either the absence of a gap or the absence of a packet in that region of memory. - This value should be cleared whenever the gap is read. - */ - uint32_t gapSize; - /** The ID of the process that the zone's data comes from. */ uint32_t processID; /** The ID of the thread that the zone's data comes from. */ uint32_t threadID; }; -#define ZoneDataOffset (sizeof(ZoneHeader)+16) - #ifdef __cplusplus } // namespace SHM #endif diff --git a/include/storage/RTree.h b/include/storage/RTree.h index 936d0fb..d88c9fd 100644 --- a/include/storage/RTree.h +++ b/include/storage/RTree.h @@ -29,8 +29,10 @@ namespace Storage { - Dimensions should be as small as possible; many linear operations take place on this number. Six is a reasonable upper bound. - FloatKey should be a version of Key that supports floating-point arithmetic (or at least non-integer division). - Unless otherwise noted, all algorithms are from the publication - R-Trees: A Dynamic Index Structure for Spatial Searching [Guttman, A]. + Unless otherwise noted, all algorithms are from + A Guttman, R-Trees: A Dynamic Index Structure for Spatial Searching, 1984. + + @todo Improve condenseTree's handling of non-leaf nodes. */ template diff --git a/modules/informer/src/collector/Informer.c b/modules/informer/src/collector/Informer.c index c807ccb..ec1357c 100644 --- a/modules/informer/src/collector/Informer.c +++ b/modules/informer/src/collector/Informer.c @@ -52,7 +52,7 @@ static THREAD_SPECIFIC PacketHeader *AI_ZonePacket = NULL; static void AI_SetupSHM(); static void *AI_MapSHM(uint32_t start, uint32_t size); static void *AI_SetupZone(); -static void *AI_ReserveSpace(); +static void *AI_ReserveSpace(uint32_t size); void AI_SetupSHM() { const char *shmName = getenv("AesalonSHMName"); @@ -70,21 +70,15 @@ void AI_SetupSHM() { } void *AI_MapSHM(uint32_t start, uint32_t size) { - struct stat s; - printf("shmFd: %i\n", AI_InformerData.shmFd); - if(fstat(AI_InformerData.shmFd, &s) != 0) { - fprintf(stderr, "Could not fstat shared memory to determine size: %s\n", strerror(errno)); - exit(1); - } - - if(s.st_size > (start+size) * AesalonPageSize) { - if(AI_InformerData.shmHeader) sem_wait(&AI_InformerData.shmHeader->resizeSemaphore); + if(AI_InformerData.shmHeader != NULL && AI_InformerData.shmHeader->shmSize < (start+size)) { + sem_wait(&AI_InformerData.shmHeader->resizeSemaphore); if(ftruncate(AI_InformerData.shmFd, (start+size) * AesalonPageSize) != 0) { fprintf(stderr, "Could not resize shared memory."); + exit(1); } - if(AI_InformerData.shmHeader) sem_post(&AI_InformerData.shmHeader->resizeSemaphore); + sem_post(&AI_InformerData.shmHeader->resizeSemaphore); } void *memory = @@ -139,8 +133,32 @@ static void *AI_SetupZone() { return AI_Zone; } -static void *AI_ReserveSpace() { - return NULL; +static void *AI_ReserveSpace(uint32_t size) { + ZoneHeader *header = (ZoneHeader *)AI_Zone; + uint32_t remaining; + if(header->head <= header->tail) { + remaining = ((AI_InformerData.shmHeader->zoneSize*AesalonPageSize) - ZoneDataOffset) + - (header->tail - header->head); + } + else { + remaining = header->head - ZoneDataOffset - header->tail; + } + + if(remaining < size) { + header->overflow = size - remaining; + sem_wait(&header->overflowSemaphore); + } + + /* If the head is less than (or equal to) the tail, then the used memory + is in one contiguous chunk, and the buffer has not wrapped yet. */ + if(header->head <= header->tail) { + header->tail += size; + return &AI_Zone[header->tail-size]; + } + else { + printf("**** Second case, returning NULL.\n"); + return NULL; + } } void __attribute__((constructor)) AI_Construct() { @@ -151,6 +169,10 @@ void __attribute__((constructor)) AI_Construct() { AI_SetupSHM(); + AI_InformerData.threadList = malloc(sizeof(pthread_t) * 16); + AI_InformerData.threadListSize = 16; + AI_InformerData.threadCount = 1; + AI_InformerData.threadList[0] = self; AI_ContinueCollection(self); } @@ -162,10 +184,14 @@ void __attribute__((destructor)) AI_Destruct() { void AC_EXPORT AI_StartPacket(ModuleID moduleID) { if(AI_Zone == NULL) AI_SetupZone(); AI_ZonePacket = AI_ReserveSpace(sizeof(PacketHeader)); + AI_ZonePacket->packetSize = 0; + AI_ZonePacket->moduleID = moduleID; + } void AC_EXPORT *AI_PacketSpace(uint32_t size) { - return NULL; + AI_ZonePacket->packetSize += size; + return AI_ReserveSpace(size); } void AC_EXPORT AI_EndPacket() { diff --git a/src/monitor/Aesalon.cpp b/src/monitor/Aesalon.cpp index e9e60b6..142b2bf 100644 --- a/src/monitor/Aesalon.cpp +++ b/src/monitor/Aesalon.cpp @@ -13,7 +13,7 @@ #include "storage/RTree.h" -#if 0 +#if 1 int main(int argc, char *argv[]) { Monitor::Coordinator coordinator(argv); coordinator.run(); diff --git a/src/monitor/ArgumentParser.cpp b/src/monitor/ArgumentParser.cpp index f24bc86..e4d49b5 100644 --- a/src/monitor/ArgumentParser.cpp +++ b/src/monitor/ArgumentParser.cpp @@ -12,6 +12,7 @@ #include "monitor/ArgumentParser.h" #include "config/Parser.h" +#include "util/MessageSystem.h" namespace Monitor { @@ -51,8 +52,7 @@ int ArgumentParser::parse(Config::Vault *vault, char **argv) { vault->set("::list-attributes", "true"); } else if(std::strncmp(argv[arg], "--", 2) == 0) { - std::cout << "Unknown argument \"" << argv[arg] << "\".\n"; - // TODO: do something about the error. + Message(Warning, "Unknown argument \"" << argv[arg] << "\"."); } else break; } diff --git a/src/monitor/Launcher.cpp b/src/monitor/Launcher.cpp index 731e5df..9570779 100644 --- a/src/monitor/Launcher.cpp +++ b/src/monitor/Launcher.cpp @@ -28,6 +28,8 @@ Launcher::~Launcher() { void Launcher::launch() { forkTarget(); + + } void Launcher::forkTarget() { diff --git a/src/monitor/SHMReader.cpp b/src/monitor/SHMReader.cpp index 1293ed1..b034036 100644 --- a/src/monitor/SHMReader.cpp +++ b/src/monitor/SHMReader.cpp @@ -43,7 +43,7 @@ uint32_t SHMReader::zoneCount() { return m_header->zoneCount; } -uint8_t *SHMReader::zoneWithData() { +int32_t SHMReader::zoneWithData() { for(uint32_t i = 0; i < m_header->zonesAllocated; i ++) { if(m_zoneUseData[i % 8] & (0x01 << (i % 8))) { uint8_t *zone = getZone(i); @@ -57,16 +57,37 @@ uint8_t *SHMReader::zoneWithData() { if(sem_wait(&zheader->packetSemaphore) == -1 && errno == EAGAIN) continue; - return zone; + return i; } } - return NULL; + return -1; } void SHMReader::waitForPacket() { sem_wait(&m_header->packetSemaphore); } +void SHMReader::readData(uint32_t zoneID, void *buffer, uint32_t size) { + uint8_t *zone = getZone(zoneID); + + SHM::ZoneHeader *header = reinterpret_cast(zone); + + /* Wait for the data to become available . . . */ + + + /* Copy the data. */ + uint32_t start1 = header->head + ZoneDataOffset; + uint32_t size1 = std::min(size, (m_header->zoneSize*AesalonPageSize)-start1); + + uint32_t size2 = size - size1; + uint32_t start2 = ZoneDataOffset; + + memcpy(buffer, zone + start1, size1); + if(size2 > 0) { + memcpy(static_cast(buffer) + size1, zone + start2, size2); + } +} + uint8_t *SHMReader::getZone(uint32_t id) { if(id < m_zoneList.size() && m_zoneList[id] != NULL) { return m_zoneList[id]; @@ -82,7 +103,7 @@ uint8_t *SHMReader::getZone(uint32_t id) { } void *SHMReader::mapRegion(uint32_t start, uint32_t size) { - if(m_header == NULL || m_header->shmSize < (start+size) * AesalonPageSize) { + if(m_header == NULL || m_header->shmSize < (start+size)) { if(m_header) sem_wait(&m_header->resizeSemaphore); Message(Debug, "Resizing SHM to " << start+size << " page(s)."); diff --git a/src/monitor/ZoneReader.cpp b/src/monitor/ZoneReader.cpp new file mode 100644 index 0000000..6d00494 --- /dev/null +++ b/src/monitor/ZoneReader.cpp @@ -0,0 +1,53 @@ +/** Aesalon, a tool to visualize program behaviour in real time. + Copyright (C) 2009-2011, Aesalon development team. + + Aesalon is distributed under the terms of the GNU GPLv3. See + the included file LICENSE for more information. + + @file src/monitor/ZoneReader.cpp +*/ + +#include "monitor/ZoneReader.h" +#include "shm/PacketHeader.h" +#include "util/MessageSystem.h" + +namespace Monitor { + +ZoneReader::ZoneReader(SHMReader *shmReader) : m_shmReader(shmReader) { + +} + +ZoneReader::~ZoneReader() { + +} + +void *ZoneReader::run(void *voidInstance) { + ZoneReader *instance = static_cast(voidInstance); + SHMReader *reader = instance->m_shmReader; + + uint8_t *dataBuffer = new uint8_t[32]; + uint32_t dataBufferSize = 32; + + while(true) { + int32_t zone = reader->zoneWithData(); + if(zone == -1) break; + + SHM::PacketHeader packetHeader; + reader->readData(zone, &packetHeader, sizeof(packetHeader)); + + if(packetHeader.packetSize > dataBufferSize) { + while(packetHeader.packetSize > dataBufferSize) dataBufferSize *= 2; + delete[] dataBuffer; + dataBuffer = new uint8_t[dataBufferSize]; + } + reader->readData(zone, dataBuffer, packetHeader.packetSize); + + Message(Log, "Recieved packet from module " << packetHeader.moduleID << ", size " << packetHeader.packetSize); + } + + delete[] dataBuffer; + + return NULL; +} + +} // namespace Monitor -- 2.11.4.GIT