4 * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
6 * Ramster_r2net provides an interface between zcache and r2net.
8 * FIXME: support more than two nodes
11 #include <linux/list.h>
12 #include "cluster/tcp.h"
13 #include "cluster/nodemanager.h"
18 #define RAMSTER_TESTING
20 #define RMSTR_KEY 0x77347734
23 RMSTR_TMEM_PUT_EPH
= 100,
25 RMSTR_TMEM_ASYNC_GET_REQUEST
,
26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST
,
27 RMSTR_TMEM_ASYNC_GET_REPLY
,
30 RMSTR_TMEM_DESTROY_POOL
,
33 #define RMSTR_R2NET_MAX_LEN \
34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
36 #include "cluster/tcp_internal.h"
38 static struct r2nm_node
*r2net_target_node
;
39 static int r2net_target_nodenum
;
41 int r2net_remote_target_node_set(int node_num
)
45 r2net_target_node
= r2nm_get_node_by_num(node_num
);
46 if (r2net_target_node
!= NULL
) {
47 r2net_target_nodenum
= node_num
;
48 r2nm_node_put(r2net_target_node
);
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf
[R2NET_MAX_PAYLOAD_BYTES
];
57 static int ramster_remote_async_get_request_handler(struct r2net_msg
*msg
,
58 u32 len
, void *data
, void **ret_data
)
61 struct tmem_xhandle xh
;
63 size_t size
= RMSTR_R2NET_MAX_LEN
;
64 u16 msgtype
= be16_to_cpu(msg
->msg_type
);
65 bool get_and_free
= (msgtype
== RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST
);
68 xh
= *(struct tmem_xhandle
*)msg
->buf
;
69 if (xh
.xh_data_size
> RMSTR_R2NET_MAX_LEN
)
71 pdata
= ramster_async_get_buf
;
72 *(struct tmem_xhandle
*)pdata
= xh
;
73 pdata
+= sizeof(struct tmem_xhandle
);
74 local_irq_save(flags
);
75 found
= zcache_get(xh
.client_id
, xh
.pool_id
, &xh
.oid
, xh
.index
,
76 pdata
, &size
, 1, get_and_free
? 1 : -1);
77 local_irq_restore(flags
);
79 /* a zero size indicates the get failed */
82 if (size
> RMSTR_R2NET_MAX_LEN
)
84 *ret_data
= pdata
- sizeof(struct tmem_xhandle
);
85 /* now make caller (r2net_process_message) handle specially */
86 r2net_force_data_magic(msg
, RMSTR_TMEM_ASYNC_GET_REPLY
, RMSTR_KEY
);
87 return size
+ sizeof(struct tmem_xhandle
);
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg
*msg
,
91 u32 len
, void *data
, void **ret_data
)
93 char *in
= (char *)msg
->buf
;
94 int datalen
= len
- sizeof(struct r2net_msg
);
96 struct tmem_xhandle
*xh
= (struct tmem_xhandle
*)in
;
98 in
+= sizeof(struct tmem_xhandle
);
99 datalen
-= sizeof(struct tmem_xhandle
);
100 BUG_ON(datalen
< 0 || datalen
> PAGE_SIZE
);
101 ret
= zcache_localify(xh
->pool_id
, &xh
->oid
, xh
->index
,
102 in
, datalen
, xh
->extra
);
103 #ifdef RAMSTER_TESTING
105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
110 int ramster_remote_put_handler(struct r2net_msg
*msg
,
111 u32 len
, void *data
, void **ret_data
)
113 struct tmem_xhandle
*xh
;
114 char *p
= (char *)msg
->buf
;
115 int datalen
= len
- sizeof(struct r2net_msg
) -
116 sizeof(struct tmem_xhandle
);
117 u16 msgtype
= be16_to_cpu(msg
->msg_type
);
118 bool ephemeral
= (msgtype
== RMSTR_TMEM_PUT_EPH
);
122 xh
= (struct tmem_xhandle
*)p
;
123 p
+= sizeof(struct tmem_xhandle
);
124 zcache_autocreate_pool(xh
->client_id
, xh
->pool_id
, ephemeral
);
125 local_irq_save(flags
);
126 ret
= zcache_put(xh
->client_id
, xh
->pool_id
, &xh
->oid
, xh
->index
,
127 p
, datalen
, 1, ephemeral
? 1 : -1);
128 local_irq_restore(flags
);
132 int ramster_remote_flush_handler(struct r2net_msg
*msg
,
133 u32 len
, void *data
, void **ret_data
)
135 struct tmem_xhandle
*xh
;
136 char *p
= (char *)msg
->buf
;
138 xh
= (struct tmem_xhandle
*)p
;
139 p
+= sizeof(struct tmem_xhandle
);
140 (void)zcache_flush(xh
->client_id
, xh
->pool_id
, &xh
->oid
, xh
->index
);
144 int ramster_remote_flobj_handler(struct r2net_msg
*msg
,
145 u32 len
, void *data
, void **ret_data
)
147 struct tmem_xhandle
*xh
;
148 char *p
= (char *)msg
->buf
;
150 xh
= (struct tmem_xhandle
*)p
;
151 p
+= sizeof(struct tmem_xhandle
);
152 (void)zcache_flush_object(xh
->client_id
, xh
->pool_id
, &xh
->oid
);
156 int ramster_remote_async_get(struct tmem_xhandle
*xh
, bool free
, int remotenode
,
157 size_t expect_size
, uint8_t expect_cksum
,
160 int ret
= -1, status
;
161 struct r2nm_node
*node
= NULL
;
166 node
= r2nm_get_node_by_num(remotenode
);
169 xh
->client_id
= r2nm_this_node(); /* which node is getting */
170 xh
->xh_data_cksum
= expect_cksum
;
171 xh
->xh_data_size
= expect_size
;
173 vec
[0].iov_len
= sizeof(*xh
);
174 vec
[0].iov_base
= xh
;
176 msg_type
= RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST
;
178 msg_type
= RMSTR_TMEM_ASYNC_GET_REQUEST
;
179 ret
= r2net_send_message_vec(msg_type
, RMSTR_KEY
,
180 vec
, veclen
, remotenode
, &status
);
183 /* FIXME handle bad message possibilities here? */
184 pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
191 #ifdef RAMSTER_TESTING
192 /* leave me here to see if it catches a weird crash */
193 static void ramster_check_irq_counts(void)
195 static int last_hardirq_cnt
, last_softirq_cnt
, last_preempt_cnt
;
196 int cur_hardirq_cnt
, cur_softirq_cnt
, cur_preempt_cnt
;
198 cur_hardirq_cnt
= hardirq_count() >> HARDIRQ_SHIFT
;
199 if (cur_hardirq_cnt
> last_hardirq_cnt
) {
200 last_hardirq_cnt
= cur_hardirq_cnt
;
201 if (!(last_hardirq_cnt
&(last_hardirq_cnt
-1)))
202 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
205 cur_softirq_cnt
= softirq_count() >> SOFTIRQ_SHIFT
;
206 if (cur_softirq_cnt
> last_softirq_cnt
) {
207 last_softirq_cnt
= cur_softirq_cnt
;
208 if (!(last_softirq_cnt
&(last_softirq_cnt
-1)))
209 pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
212 cur_preempt_cnt
= preempt_count() & PREEMPT_MASK
;
213 if (cur_preempt_cnt
> last_preempt_cnt
) {
214 last_preempt_cnt
= cur_preempt_cnt
;
215 if (!(last_preempt_cnt
&(last_preempt_cnt
-1)))
216 pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
222 int ramster_remote_put(struct tmem_xhandle
*xh
, char *data
, size_t size
,
223 bool ephemeral
, int *remotenode
)
225 int nodenum
, ret
= -1, status
;
226 struct r2nm_node
*node
= NULL
;
230 #ifdef RAMSTER_TESTING
231 struct r2net_node
*nn
;
234 BUG_ON(size
> RMSTR_R2NET_MAX_LEN
);
235 xh
->client_id
= r2nm_this_node(); /* which node is putting */
236 vec
[0].iov_len
= sizeof(*xh
);
237 vec
[0].iov_base
= xh
;
238 vec
[1].iov_len
= size
;
239 vec
[1].iov_base
= data
;
240 node
= r2net_target_node
;
244 nodenum
= r2net_target_nodenum
;
248 #ifdef RAMSTER_TESTING
249 nn
= r2net_nn_from_num(nodenum
);
250 WARN_ON_ONCE(nn
->nn_persistent_error
|| !nn
->nn_sc_valid
);
254 msg_type
= RMSTR_TMEM_PUT_EPH
;
256 msg_type
= RMSTR_TMEM_PUT_PERS
;
257 #ifdef RAMSTER_TESTING
258 /* leave me here to see if it catches a weird crash */
259 ramster_check_irq_counts();
262 ret
= r2net_send_message_vec(msg_type
, RMSTR_KEY
, vec
, veclen
,
264 #ifdef RAMSTER_TESTING
266 static unsigned long cnt
;
269 pr_err("ramster_remote_put: message failed, ret=%d, cnt=%lu\n",
278 *remotenode
= nodenum
;
286 int ramster_remote_flush(struct tmem_xhandle
*xh
, int remotenode
)
288 int ret
= -1, status
;
289 struct r2nm_node
*node
= NULL
;
293 node
= r2nm_get_node_by_num(remotenode
);
294 BUG_ON(node
== NULL
);
295 xh
->client_id
= r2nm_this_node(); /* which node is flushing */
296 vec
[0].iov_len
= sizeof(*xh
);
297 vec
[0].iov_base
= xh
;
298 BUG_ON(irqs_disabled());
299 BUG_ON(in_softirq());
300 ret
= r2net_send_message_vec(RMSTR_TMEM_FLUSH
, RMSTR_KEY
,
301 vec
, veclen
, remotenode
, &status
);
306 int ramster_remote_flush_object(struct tmem_xhandle
*xh
, int remotenode
)
308 int ret
= -1, status
;
309 struct r2nm_node
*node
= NULL
;
313 node
= r2nm_get_node_by_num(remotenode
);
314 BUG_ON(node
== NULL
);
315 xh
->client_id
= r2nm_this_node(); /* which node is flobjing */
316 vec
[0].iov_len
= sizeof(*xh
);
317 vec
[0].iov_base
= xh
;
318 ret
= r2net_send_message_vec(RMSTR_TMEM_FLOBJ
, RMSTR_KEY
,
319 vec
, veclen
, remotenode
, &status
);
325 * Handler registration
328 static LIST_HEAD(r2net_unreg_list
);
330 static void r2net_unregister_handlers(void)
332 r2net_unregister_handler_list(&r2net_unreg_list
);
335 int r2net_register_handlers(void)
339 status
= r2net_register_handler(RMSTR_TMEM_PUT_EPH
, RMSTR_KEY
,
341 ramster_remote_put_handler
,
342 NULL
, NULL
, &r2net_unreg_list
);
346 status
= r2net_register_handler(RMSTR_TMEM_PUT_PERS
, RMSTR_KEY
,
348 ramster_remote_put_handler
,
349 NULL
, NULL
, &r2net_unreg_list
);
353 status
= r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST
, RMSTR_KEY
,
355 ramster_remote_async_get_request_handler
,
361 status
= r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST
,
362 RMSTR_KEY
, RMSTR_R2NET_MAX_LEN
,
363 ramster_remote_async_get_request_handler
,
369 status
= r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY
, RMSTR_KEY
,
371 ramster_remote_async_get_reply_handler
,
377 status
= r2net_register_handler(RMSTR_TMEM_FLUSH
, RMSTR_KEY
,
379 ramster_remote_flush_handler
,
385 status
= r2net_register_handler(RMSTR_TMEM_FLOBJ
, RMSTR_KEY
,
387 ramster_remote_flobj_handler
,
393 pr_info("ramster: r2net handlers registered\n");
397 r2net_unregister_handlers();
398 pr_err("ramster: couldn't register r2net handlers\n");