From 60c0470d009918c683043082c879996308b7a941 Mon Sep 17 00:00:00 2001 From: Mohamed Aslan Date: Tue, 19 Jan 2016 17:07:45 -0500 Subject: [PATCH] add murmur3 partitioner --- Makefile | 9 ++++- dht.c | 67 ++++++++++++++++++++++++++++++++- dht.h | 8 ++-- murmur3.c | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ murmur3.h | 12 ++++++ test.c | 7 ++++ test_murmur3.c | 83 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 296 insertions(+), 7 deletions(-) create mode 100644 murmur3.c create mode 100644 murmur3.h create mode 100644 test_murmur3.c diff --git a/Makefile b/Makefile index 01a9094..22330d9 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ LIB= dht -SRCS= dht.c +SRCS= dht.c murmur3.c HDRS= dht.h SHLIB_MAJOR= 0 SHLIB_MINOR= 1 @@ -13,7 +13,7 @@ MLINKS= libdht.3 dht_init.3 \ libdht.3 dht_event_loop.3 \ CFLAGS+= -Wall -Werror -I. -COPTS+= -g +COPTS+= -g -DDEBUG includes: @@ -25,4 +25,9 @@ includes: eval "$$j"; \ done +test: all + @$(CC) test_murmur3.c -o test_murmur3 -L. -ldht -static $(CFLAGS) + @./test_murmur3 + @rm test_murmur3 + .include diff --git a/dht.c b/dht.c index 73091d3..f4c8cb1 100644 --- a/dht.c +++ b/dht.c @@ -14,6 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +#include #include #include #include @@ -22,16 +23,28 @@ #include #include +#include #define TCPBACKLOG 500 #define POSTINIT_WAIT 0 #define POSTINIT_TID 0 +#define MAX_PEERS 64 + #include +static uint64_t +murmur3_partitioner(const char *key) +{ + uint64_t token[2]; + + murmurhash3_x64_128(key, strlen(key), 0, token); + return token[0]; +} + static struct dht_options defopts = { - .partitioner = NULL, + .partitioner = murmur3_partitioner, .ht_init = NULL, .ht_put = NULL, .ht_get = NULL, @@ -47,7 +60,7 @@ dht_init(struct dht_node *node, int port, int n, uint32_t flags, struct dht_opti if (node == NULL) return 0; node->port = port; - node->n_replica = n; + node->n_replicas = n; node->flags = flags; if (opts == NULL) node->opts = &defopts; @@ -75,6 +88,8 @@ dht_init(struct dht_node *node, int port, int n, uint32_t flags, struct dht_opti EV_SET(&kev, node->socket, EVFILT_READ, EV_ADD, 0, 0, NULL); if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1) goto error; + node->peers = reallocarray(NULL, MAX_PEERS, sizeof(struct dht_peer)); + node->n_peers = 0; node->ready = 1; return 1; error: @@ -82,10 +97,58 @@ error: return 0; } +/* + * - assume array is already sorted + * - O(n) + */ +static int +insert_token_sorted(struct dht_peer peers[], int *n, uint64_t token) +{ + int i, j; + + for (i = 0 ; i < *n ; i++) { + if (peers[i].token > token) + break; + } + for (j = *n ; j > i ; j--) { + peers[j] = peers[j - 1]; + } + peers[i].token = token; + ++*n; + return i; +} + +#ifdef DEBUG +static void +print(struct dht_peer peers[], int n) +{ + int i; + + for (i = 0 ; i < n ; i++) + printf("%llx ", peers[i].token); + printf("\n"); +} +#endif + + void dht_add_peer(struct dht_node *node, const char *ip, int port) { + char *p; + uint64_t token; + int i; + if (node == NULL || ip == NULL) + return; + if (node->n_peers == MAX_PEERS) + return; + asprintf(&p, "%s:%d", ip, port); + token = node->opts->partitioner(p); + i = insert_token_sorted(node->peers, &node->n_peers, token); + node->peers[i].last_recv = 0; +#ifdef DEBUG + print(node->peers, node->n_peers); +#endif } static void diff --git a/dht.h b/dht.h index 5d9c37d..607cb87 100644 --- a/dht.h +++ b/dht.h @@ -23,22 +23,24 @@ struct dht_node { - uint64_t id; + uint64_t token; int port; int socket; int ev_queue; - int n_replica; + int n_replicas; + int n_peers; int def_r; int def_w; uint8_t ready; uint32_t flags; struct sockaddr_storage saddr; struct dht_options *opts; + struct dht_peer *peers; void *ht; }; struct dht_peer { - uint64_t id; + uint64_t token; int sock; time_t last_recv; struct sockaddr_storage s_addr; diff --git a/murmur3.c b/murmur3.c new file mode 100644 index 0000000..25d969b --- /dev/null +++ b/murmur3.c @@ -0,0 +1,117 @@ +/* + * based on code written by Austin Appleby and placed in the public domain + */ + +/* TODO: x86 and big endian */ + +#include + +#define ROTL64(x,n) (((x) << (n)) | ((x) >> (64 - (n)))) + +static inline uint64_t +getblock64(const uint64_t *p, int i) +{ + return p[i]; +} + +static inline uint64_t +fmix64(uint64_t k) +{ + k ^= k >> 33; + k *= 0xff51afd7ed558ccdLLU; + k ^= k >> 33; + k *= 0xc4ceb9fe1a85ec53LLU; + k ^= k >> 33; + return k; +} + +void +murmurhash3_x64_128(const void *key, const int len, const uint32_t seed, void *out) +{ + const uint8_t *data = (const uint8_t *)key; + const uint64_t *blocks = (const uint64_t *)(data); + const int nblocks = len / 16; + const uint64_t c1 = 0x87c37b91114253d5LLU; + const uint64_t c2 = 0x4cf5ad432745937fLLU; + const uint8_t *tail = (const uint8_t *)(data + nblocks * 16); + uint64_t h1 = seed; + uint64_t h2 = seed; + uint64_t k1; + uint64_t k2; + int i; + + for(i = 0 ; i < nblocks ; i++) { + k1 = getblock64(blocks, i * 2 + 0); + k2 = getblock64(blocks, i * 2 + 1); + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + h1 = ROTL64(h1, 27); + h1 += h2; + h1 = h1 * 5 + 0x52dce729; + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + h2 = ROTL64(h2, 31); + h2 += h1; + h2 = h2 * 5 + 0x38495ab5; + } + + k1 = 0; + k2 = 0; + switch(len & 15) { + case 15: + k2 ^= ((uint64_t)tail[14]) << 48; + case 14: + k2 ^= ((uint64_t)tail[13]) << 40; + case 13: + k2 ^= ((uint64_t)tail[12]) << 32; + case 12: + k2 ^= ((uint64_t)tail[11]) << 24; + case 11: + k2 ^= ((uint64_t)tail[10]) << 16; + case 10: + k2 ^= ((uint64_t)tail[9]) << 8; + case 9: + k2 ^= ((uint64_t)tail[8]) << 0; + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + case 8: + k1 ^= ((uint64_t)tail[7]) << 56; + case 7: + k1 ^= ((uint64_t)tail[6]) << 48; + case 6: + k1 ^= ((uint64_t)tail[5]) << 40; + case 5: + k1 ^= ((uint64_t)tail[4]) << 32; + case 4: + k1 ^= ((uint64_t)tail[3]) << 24; + case 3: + k1 ^= ((uint64_t)tail[2]) << 16; + case 2: + k1 ^= ((uint64_t)tail[1]) << 8; + case 1: + k1 ^= ((uint64_t)tail[0]) << 0; + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + }; + + h1 ^= len; + h2 ^= len; + h1 += h2; + h2 += h1; + h1 = fmix64(h1); + h2 = fmix64(h2); + h1 += h2; + h2 += h1; + + ((uint64_t*)out)[0] = h1; + ((uint64_t*)out)[1] = h2; +} + diff --git a/murmur3.h b/murmur3.h new file mode 100644 index 0000000..a1b302c --- /dev/null +++ b/murmur3.h @@ -0,0 +1,12 @@ +/* + * based on code written by Austin Appleby and placed in the public domain + */ + +#ifndef MURMUR3_H +#define MURMUR3_H + +#include + +void murmurhash3_x64_128(const void *, const int, const uint32_t, void *); + +#endif diff --git a/test.c b/test.c index d41f041..03df0d3 100644 --- a/test.c +++ b/test.c @@ -25,6 +25,13 @@ main() if (!dht_init(&n, 5000, 5, DHT_USEUDP, NULL)) err(1, "dht_init"); + + dht_add_peer(&n, "127.0.0.1", 6000); + dht_add_peer(&n, "127.0.0.1", 8000); + dht_add_peer(&n, "127.0.0.1", 5000); + dht_add_peer(&n, "127.0.0.1", 7000); + dht_add_peer(&n, "127.0.0.1", 6000); + dht_event_loop(&n); return 0; } diff --git a/test_murmur3.c b/test_murmur3.c new file mode 100644 index 0000000..83a1ff1 --- /dev/null +++ b/test_murmur3.c @@ -0,0 +1,83 @@ +#include +#include + +#include "murmur3.h" + +int +main(int argc, char **argv) +{ + uint64_t hash[2]; + char *testcase0 = ""; + char *testcase1 = "a"; + char *testcase2 = "$!"; + char *testcase3 = "hello world!"; + char *testcase4 = "abracadabra"; + char *testcase5 = "Abracadabra"; + char *testcase6 = "Lorem ipsum dolor sit amet."; + char *testcase7 = "Lorem ipsum dolor sit amet orci aliquam."; + char *testcase8 = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse eu augue quis velit massa nunc."; + char *testcase9 = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc ultricies quam sit amet odio finibus ultricies. Integer vitae ante libero. Etiam gravida ac purus nec efficitur. Nam faucibus finibus velit, eu pulvinar nibh condimentum sed. Quisque tempor sed."; + + murmurhash3_x64_128(testcase0, strlen(testcase0), 0, hash); + if ((hash[0] != 0x0LLU) || (hash[1] != 0x0LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase0, hash[0], hash[1], 0x0LLU, 0x0LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase1, strlen(testcase1), 0, hash); + if ((hash[0] != 0x85555565f6597889LLU) || (hash[1] != 0xe6b53a48510e895aLLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase1, hash[0], hash[1], 0x85555565f6597889LLU, 0xe6b53a48510e895aLLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase2, strlen(testcase2), 0, hash); + if ((hash[0] != 0xbd5f7bc8a4fdd541LLU) || (hash[1] != 0x2638a8ef74dade00LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase2, hash[0], hash[1], 0xbd5f7bc8a4fdd541LLU, 0x2638a8ef74dade00LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase3, strlen(testcase3), 0, hash); + if ((hash[0] != 0x5aa80377fe21bbe3LLU) || (hash[1] != 0xedf56b1420cea7e7LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase3, hash[0], hash[1], 0x5aa80377fe21bbe3LLU, 0xedf56b1420cea7e7LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase4, strlen(testcase4), 0, hash); + if ((hash[0] != 0x3386097a32a76032LLU) || (hash[1] != 0x216f524eb811e738LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase4, hash[0], hash[1], 0x3386097a32a76032LLU, 0x216f524eb811e738LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase5, strlen(testcase5), 0, hash); + if ((hash[0] != 0x84c607dbc50d6e93LLU) || (hash[1] != 0x68532ed845d04979LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase5, hash[0], hash[1], 0x84c607dbc50d6e93LLU, 0x68532ed845d04979LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase6, strlen(testcase6), 0, hash); + if ((hash[0] != 0x147b136c7bcdcd01LLU) || (hash[1] != 0x697f1d69b4d42a70LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase6, hash[0], hash[1], 0x147b136c7bcdcd01LLU, 0x697f1d69b4d42a70LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase7, strlen(testcase7), 0, hash); + if ((hash[0] != 0x3e19e6296c644dfbLLU) || (hash[1] != 0x91157457fc87a37bLLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase7, hash[0], hash[1], 0x3e19e6296c644dfbLLU, 0x91157457fc87a37bLLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase8, strlen(testcase8), 0, hash); + if ((hash[0] != 0xa1148cf5710c8478LLU) || (hash[1] != 0x1eccb6487c585ed0LLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase8, hash[0], hash[1], 0xa1148cf5710c8478LLU, 0x1eccb6487c585ed0LLU); + else + printf("PASSED\n"); + + murmurhash3_x64_128(testcase9, strlen(testcase9), 0, hash); + if ((hash[0] != 0xe3787b529415e368LLU) || (hash[1] != 0xbab4b8be10f4213fLLU)) + printf("FAILED \"%s\": %llx%llx should be %llx%llx\n", testcase9, hash[0], hash[1], 0xe3787b529415e368LLU, 0xbab4b8be10f4213fLLU); + else + printf("PASSED\n"); + + return 0; +} + -- 2.11.4.GIT