2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #include "elliptics.h"
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
34 struct dnet_notify_entry
36 struct list_head notify_entry
;
38 struct dnet_net_state
*state
;
41 static unsigned int dnet_notify_hash(struct dnet_id
*id
, unsigned int hash_size
)
43 unsigned int hash
= 0xbb40e64d; /* 3.141592653 */
45 unsigned int *int_id
= (unsigned int *)id
->id
;
47 for (i
=0; i
<DNET_ID_SIZE
/ 4; ++i
)
54 int dnet_update_notify(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
)
56 struct dnet_node
*n
= st
->n
;
57 unsigned int hash
= dnet_notify_hash(&cmd
->id
, n
->notify_hash_size
);
58 struct dnet_notify_bucket
*b
= &n
->notify_hash
[hash
];
59 struct dnet_notify_entry
*nt
;
60 struct dnet_io_attr
*io
= data
;
61 struct dnet_io_notification
not;
63 /*if (io->size == sizeof(struct dnet_history_entry)) {
64 struct dnet_history_entry *h = (struct dnet_history_entry *)(io + 1);
66 memcpy(¬.io.id, h->id, DNET_ID_SIZE);
67 memcpy(¬.io.parent, io->parent, DNET_ID_SIZE);
68 not.io.size = h->size;
69 not.io.offset = h->offset;
70 not.io.flags = io->flags;
72 dnet_convert_io_attr(¬.io);
74 memcpy(¬.io
, io
, sizeof(struct dnet_io_attr
));
75 dnet_convert_io_attr(¬.io
);
78 not.addr
.sock_type
= n
->sock_type
;
79 not.addr
.family
= n
->family
;
80 not.addr
.proto
= n
->proto
;
82 pthread_rwlock_rdlock(&b
->notify_lock
);
83 list_for_each_entry(nt
, &b
->notify_list
, notify_entry
) {
84 if (dnet_id_cmp(&cmd
->id
, &nt
->cmd
.id
))
87 memcpy(¬.addr
.addr
, &st
->addr
, sizeof(struct dnet_addr
));
89 dnet_log(n
, DNET_LOG_NOTICE
, "%s: sending notification.\n", dnet_dump_id(&cmd
->id
));
90 dnet_send_reply(nt
->state
, &nt
->cmd
, ¬, sizeof(struct dnet_io_notification
), 1);
92 pthread_rwlock_unlock(&b
->notify_lock
);
97 static void dnet_notify_entry_destroy(struct dnet_notify_entry
*e
)
99 dnet_state_put(e
->state
);
103 int dnet_notify_add(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
)
105 struct dnet_node
*n
= st
->n
;
106 struct dnet_notify_entry
*e
;
107 unsigned int hash
= dnet_notify_hash(&cmd
->id
, n
->notify_hash_size
);
108 struct dnet_notify_bucket
*b
= &n
->notify_hash
[hash
];
110 e
= malloc(sizeof(struct dnet_notify_entry
));
114 e
->state
= dnet_state_get(st
);
115 memcpy(&e
->cmd
, cmd
, sizeof(struct dnet_cmd
));
117 pthread_rwlock_wrlock(&b
->notify_lock
);
118 list_add_tail(&e
->notify_entry
, &b
->notify_list
);
119 pthread_rwlock_unlock(&b
->notify_lock
);
121 dnet_log(n
, DNET_LOG_INFO
, "%s: added notification, hash: %x.\n", dnet_dump_id(&cmd
->id
), hash
);
126 int dnet_notify_remove(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
)
128 struct dnet_node
*n
= st
->n
;
129 struct dnet_notify_entry
*e
, *tmp
;
130 unsigned int hash
= dnet_notify_hash(&cmd
->id
, n
->notify_hash_size
);
131 struct dnet_notify_bucket
*b
= &n
->notify_hash
[hash
];
134 pthread_rwlock_wrlock(&b
->notify_lock
);
135 list_for_each_entry_safe(e
, tmp
, &b
->notify_list
, notify_entry
) {
136 if (dnet_id_cmp(&e
->cmd
.id
, &cmd
->id
))
140 err
= dnet_send_reply(e
->state
, &e
->cmd
, NULL
, 0, 0);
142 list_del(&e
->notify_entry
);
143 dnet_notify_entry_destroy(e
);
145 dnet_log(n
, DNET_LOG_INFO
, "%s: removed notification.\n", dnet_dump_id(&cmd
->id
));
148 pthread_rwlock_unlock(&b
->notify_lock
);
153 int dnet_notify_init(struct dnet_node
*n
)
156 struct dnet_notify_bucket
*b
;
159 n
->notify_hash
= malloc(sizeof(struct dnet_notify_bucket
) * n
->notify_hash_size
);
160 if (!n
->notify_hash
) {
161 errno
= ENOMEM
; /* Linux does not set errno when malloc fails */
162 dnet_log_err(n
, "Failed to allocate %u notify hash buckets", n
->notify_hash_size
);
167 for (i
=0; i
<n
->notify_hash_size
; ++i
) {
168 b
= &n
->notify_hash
[i
];
170 INIT_LIST_HEAD(&b
->notify_list
);
171 err
= pthread_rwlock_init(&b
->notify_lock
, NULL
);
174 dnet_log_err(n
, "Failed to initialize %d'th bucket lock: err: %d", i
, err
);
179 dnet_log(n
, DNET_LOG_INFO
, "Successfully initialized notify hash table (%u entries).\n",
180 n
->notify_hash_size
);
185 n
->notify_hash_size
= i
;
186 for (i
=0; i
<n
->notify_hash_size
; ++i
) {
187 b
= &n
->notify_hash
[i
];
188 pthread_rwlock_destroy(&b
->notify_lock
);
190 free(n
->notify_hash
);
195 void dnet_notify_exit(struct dnet_node
*n
)
198 struct dnet_notify_bucket
*b
;
199 struct dnet_notify_entry
*e
, *tmp
;
201 for (i
=0; i
<n
->notify_hash_size
; ++i
) {
202 b
= &n
->notify_hash
[i
];
204 pthread_rwlock_wrlock(&b
->notify_lock
);
205 list_for_each_entry_safe(e
, tmp
, &b
->notify_list
, notify_entry
) {
206 list_del(&e
->notify_entry
);
208 dnet_notify_entry_destroy(e
);
210 pthread_rwlock_unlock(&b
->notify_lock
);
212 pthread_rwlock_destroy(&b
->notify_lock
);
214 free(n
->notify_hash
);