From 73b0e734b7e6e60457b51576e7eb296b21182f05 Mon Sep 17 00:00:00 2001 From: Michael Blizek Date: Fri, 8 Oct 2010 07:36:44 +0200 Subject: [PATCH] initial seqno --- net/cor/common.c | 25 +++++++++++++++++++++--- net/cor/cor.h | 18 +++++++++-------- net/cor/forward.c | 4 ++-- net/cor/kpacket_gen.c | 51 +++++++++++++++++++++++++++++++++---------------- net/cor/kpacket_parse.c | 41 +++++++++++++++++---------------------- net/cor/rcv.c | 19 ++++++++++-------- net/cor/settings.h | 1 + net/cor/snd.c | 7 ++++--- 8 files changed, 102 insertions(+), 64 deletions(-) diff --git a/net/cor/common.c b/net/cor/common.c index 2d52c886bbc..6a4e2628de4 100644 --- a/net/cor/common.c +++ b/net/cor/common.c @@ -452,7 +452,6 @@ int conn_init_out(struct conn *rconn, struct neighbor *nb) rconn->target.out.stall_timeout_ms = stall_timeout_ms; skb_queue_head_init(&(sconn->source.in.reorder_queue)); atomic_set(&(sconn->source.in.pong_awaiting), 0); - atomic_set(&(sconn->source.in.usage_reserve), 0); if (unlikely(connid_alloc(sconn))) { rc = 1; @@ -461,7 +460,9 @@ int conn_init_out(struct conn *rconn, struct neighbor *nb) INIT_LIST_HEAD(&(rconn->target.out.retrans_list)); - reset_seqno(&(rconn->buf)); + reset_seqno(&(rconn->buf), 0); + get_random_bytes((char *) &(sconn->source.in.next_seqno), + sizeof(sconn->source.in.next_seqno)); mutex_lock(&(nb->conn_list_lock)); list_add_tail(&(sconn->source.in.nb_list), &(nb->rcv_conn_list)); @@ -493,7 +494,7 @@ void conn_init_sock_target(struct conn *conn) conn->targettype = TARGET_SOCK; memset(&(conn->target.sock), 0, sizeof(conn->target.sock)); init_waitqueue_head(&(conn->target.sock.wait)); - reset_seqno(&(conn->buf)); + reset_seqno(&(conn->buf), 0); } struct conn* alloc_conn(gfp_t allocflags) @@ -707,6 +708,7 @@ int connect_neigh(struct conn *rconn, } send_connect_nb(cm, rconn->reversedir->source.in.conn_id, + rconn->reversedir->source.in.next_seqno, rconn->reversedir); neigh_kref: @@ -715,6 +717,21 @@ neigh_kref: return rc; } +void reset_ping(struct conn *rconn) +{ + struct neighbor *nb = rconn->source.in.nb; + if (atomic_read(&(rconn->source.in.pong_awaiting)) != 0) { + mutex_lock(&(nb->conn_list_lock)); + if (atomic_read(&(rconn->source.in.pong_awaiting)) == 0) + goto unlock; + + atomic_set(&(rconn->source.in.pong_awaiting), 0); + nb->pong_conns_expected--; +unlock: + mutex_unlock(&(nb->conn_list_lock)); + } +} + static int _reset_conn(struct conn *conn) { /* @@ -729,6 +746,8 @@ static int _reset_conn(struct conn *conn) if (isreset == 2 || isreset == 3) return 0; + reset_ping(conn); + /* lock sourcetype/targettype */ mutex_lock(&(conn->rcv_lock)); diff --git a/net/cor/cor.h b/net/cor/cor.h index 2296062007b..1c5e55127ae 100644 --- a/net/cor/cor.h +++ b/net/cor/cor.h @@ -108,13 +108,13 @@ struct cor_sockaddr { /* * NOTE on connection ids: - * connection ids we send are used for the receive channel - * connection ids we receive are used for the send channel + * connection ids + init seqnos we send are used for the receive channel + * connection ids + init seqnos we receive are used for the send channel */ /* * incoming connection - * KP_CONNECT[1] conn_id[4] window[1] + * KP_CONNECT[1] conn_id[4] init_seqno[4] window[1] */ #define KP_CONNECT 7 @@ -122,7 +122,7 @@ struct cor_sockaddr { * incoming connection successful, * the first conn_id is the same as previously sent/received in KP_CONNECT * the second conn_id is generated by us and used for the other direction - * KP_CONNECT_SUCCESS[1] conn_id[4] conn_id[4] window[1] + * KP_CONNECT_SUCCESS[1] conn_id[4] conn_id[4] init_seqno[4] window[1] */ #define KP_CONNECT_SUCCESS 8 @@ -592,7 +592,7 @@ struct conn{ __u32 usage_init; __u32 usage_speed; __u32 usage_ata; - atomic_t usage_reserve; + __u32 usage_reserve; struct speedtracker st; @@ -720,6 +720,8 @@ extern int connect_neigh(struct conn *rconn, extern struct conn* alloc_conn(gfp_t allocflags); +extern void reset_ping(struct conn *rconn); + extern void reset_conn(struct conn *conn); /* credits.c */ @@ -822,10 +824,10 @@ extern void send_ack_conn_ooo(struct control_msg_out *cm, struct conn *rconn, __u32 conn_id, __u32 seqno, __u32 seqno_ooo, __u32 length); extern void send_connect_success(struct control_msg_out *cm, __u32 rcvd_conn_id, - __u32 gen_conn_id, struct conn *rconn); + __u32 gen_conn_id, __u32 init_seqno, struct conn *rconn); extern void send_connect_nb(struct control_msg_out *cm, __u32 conn_id, - struct conn *sconn); + __u32 init_seqno, struct conn *sconn); extern void send_conndata(struct control_msg_out *cm, __u32 conn_id, __u32 seqno, char *data_orig, char *data, __u32 datalen); @@ -888,7 +890,7 @@ extern void databuf_ackread(struct conn *rconn); extern int databuf_maypush(struct data_buf *buf); -extern void reset_seqno(struct data_buf *buf); +extern void reset_seqno(struct data_buf *buf, __u32 initseqno); extern void databuf_free(struct data_buf *data); diff --git a/net/cor/forward.c b/net/cor/forward.c index 9e71fee115a..2814e6ed868 100644 --- a/net/cor/forward.c +++ b/net/cor/forward.c @@ -47,9 +47,9 @@ static void databuf_item_free(struct data_buf_item *item) kmem_cache_free(data_buf_item_slab, item); } -void reset_seqno(struct data_buf *buf) +void reset_seqno(struct data_buf *buf, __u32 initseqno) { - buf->first_offset = 0 - buf->last_read_offset; + buf->first_offset = initseqno - buf->last_read_offset; } void databuf_free(struct data_buf *data) diff --git a/net/cor/kpacket_gen.c b/net/cor/kpacket_gen.c index a83ee53c6f2..814f8c99f96 100644 --- a/net/cor/kpacket_gen.c +++ b/net/cor/kpacket_gen.c @@ -71,12 +71,14 @@ struct control_msg_out{ struct{ __u32 conn_id; + __u32 init_seqno; struct conn *sconn; }connect; struct{ __u32 rcvd_conn_id; __u32 gen_conn_id; + __u32 init_seqno; struct conn *rconn; }connect_success; @@ -212,14 +214,23 @@ void free_control_msg(struct control_msg_out *cm) atomic_dec(&(cmcnt)); } - if (cm->type == MSGTYPE_ACK_CONN) + if (cm->type == MSGTYPE_ACK_CONN) { + BUG_ON(cm->msg.ack_conn.rconn == 0); kref_put(&(cm->msg.ack_conn.rconn->ref), free_conn); - else if (cm->type == MSGTYPE_ACK_CONN_OOO) + cm->msg.ack_conn.rconn = 0; + } else if (cm->type == MSGTYPE_ACK_CONN_OOO) { + BUG_ON(cm->msg.ack_conn_ooo.rconn == 0); kref_put(&(cm->msg.ack_conn_ooo.rconn->ref), free_conn); - else if (cm->type == MSGTYPE_CONNECT) + cm->msg.ack_conn_ooo.rconn = 0; + } else if (cm->type == MSGTYPE_CONNECT) { + BUG_ON(cm->msg.connect.sconn == 0); kref_put(&(cm->msg.connect.sconn->ref), free_conn); - else if (cm->type == MSGTYPE_CONNECT_SUCCESS) + cm->msg.connect.sconn = 0; + } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) { + BUG_ON(cm->msg.connect_success.rconn == 0); kref_put(&(cm->msg.connect_success.rconn->ref), free_conn); + cm->msg.connect_success.rconn = 0; + } kmem_cache_free(controlmsg_slab, cm); } @@ -433,6 +444,7 @@ static int add_ack_conn(struct sk_buff *skb, struct control_retrans *cr, dst[0] = KP_ACK_CONN; put_u32(dst + 1, cm->msg.ack_conn.conn_id, 1); put_u32(dst + 5, cm->msg.ack_conn.seqno, 1); + BUG_ON(cm->msg.ack_conn.rconn == 0); dst[9] = enc_window(get_window(cm->msg.ack_conn.rconn)); list_add_tail(&(cm->lh), &(cr->msgs)); @@ -454,6 +466,7 @@ static int add_ack_conn_ooo(struct sk_buff *skb, struct control_retrans *cr, dst[0] = KP_ACK_CONN_OOO; put_u32(dst + 1, cm->msg.ack_conn_ooo.conn_id, 1); put_u32(dst + 5, cm->msg.ack_conn_ooo.seqno, 1); + BUG_ON(cm->msg.ack_conn_ooo.rconn == 0); dst[9] = enc_window(get_window(cm->msg.ack_conn_ooo.rconn)); put_u32(dst + 10, cm->msg.ack_conn_ooo.seqno_ooo, 1); put_u32(dst + 14, cm->msg.ack_conn_ooo.length, 1); @@ -503,21 +516,22 @@ static int add_pong(struct sk_buff *skb, struct control_retrans *cr, return 9; } -#warning todo initial seqno static int add_connect(struct sk_buff *skb, struct control_retrans *cr, struct control_msg_out *cm, int spaceleft) { char *dst; - if (unlikely(spaceleft < 6)) + if (unlikely(spaceleft < 10)) return 0; - dst = skb_put(skb, 6); + dst = skb_put(skb, 10); BUG_ON(dst == 0); dst[0] = KP_CONNECT; put_u32(dst + 1, cm->msg.connect.conn_id, 1); - dst[5] = enc_window(get_window(cm->msg.connect.sconn)); + put_u32(dst + 5, cm->msg.connect.init_seqno, 1); + BUG_ON(cm->msg.connect.sconn == 0); + dst[9] = enc_window(get_window(cm->msg.connect.sconn)); list_add_tail(&(cm->lh), &(cr->msgs)); @@ -529,16 +543,18 @@ static int add_connect_success(struct sk_buff *skb, struct control_retrans *cr, { char *dst; - if (unlikely(spaceleft < 10)) + if (unlikely(spaceleft < 14)) return 0; - dst = skb_put(skb, 10); + dst = skb_put(skb, 14); BUG_ON(dst == 0); dst[0] = KP_CONNECT_SUCCESS; put_u32(dst + 1, cm->msg.connect_success.rcvd_conn_id, 1); put_u32(dst + 5, cm->msg.connect_success.gen_conn_id, 1); - dst[9] = enc_window(get_window(cm->msg.connect_success.rconn)); + put_u32(dst + 9, cm->msg.connect_success.init_seqno, 1); + BUG_ON(cm->msg.connect_success.rconn == 0); + dst[13] = enc_window(get_window(cm->msg.connect_success.rconn)); list_add_tail(&(cm->lh), &(cr->msgs)); @@ -1295,7 +1311,8 @@ static void add_control_msg(struct control_msg_out *cm, int retrans) BUG_ON(msgs <= 0); if (unlikely(retrans)) { - if (msgs > MAX_URGENT_CMSGS_PER_NEIGH) { + if (msgs > MAX_URGENT_CMSGS_PER_NEIGH_RETRANSALLOW || + msgs > MAX_URGENT_CMSGS_PER_NEIGH) { atomic_dec(&(cm->nb->ucmcnt)); free_control_msg(cm); goto out; @@ -1389,25 +1406,27 @@ void send_ack_conn_ooo(struct control_msg_out *cm, struct conn *rconn, } void send_connect_success(struct control_msg_out *cm, __u32 rcvd_conn_id, - __u32 gen_conn_id, struct conn *rconn) + __u32 gen_conn_id, __u32 init_seqno, struct conn *rconn) { cm->type = MSGTYPE_CONNECT_SUCCESS; cm->msg.connect_success.rcvd_conn_id = rcvd_conn_id; cm->msg.connect_success.gen_conn_id = gen_conn_id; + cm->msg.connect_success.init_seqno = init_seqno; kref_get(&(rconn->ref)); cm->msg.connect_success.rconn = rconn; - cm->length = 10; + cm->length = 14; add_control_msg(cm, 0); } void send_connect_nb(struct control_msg_out *cm, __u32 conn_id, - struct conn *sconn) + __u32 init_seqno, struct conn *sconn) { cm->type = MSGTYPE_CONNECT; cm->msg.connect.conn_id = conn_id; + cm->msg.connect.init_seqno = init_seqno; kref_get(&(sconn->ref)); cm->msg.connect.sconn = sconn; - cm->length = 6; + cm->length = 10; add_control_msg(cm, 0); } diff --git a/net/cor/kpacket_parse.c b/net/cor/kpacket_parse.c index 9085120c249..9edfd343ae6 100644 --- a/net/cor/kpacket_parse.c +++ b/net/cor/kpacket_parse.c @@ -85,21 +85,6 @@ static __u8 pull_u8(struct sk_buff *skb) return *ptr; } -static void pong_rcvd(struct conn *rconn) -{ - struct neighbor *nb = rconn->source.in.nb; - if (atomic_read(&(rconn->source.in.pong_awaiting)) != 0) { - mutex_lock(&(nb->conn_list_lock)); - if (atomic_read(&(rconn->source.in.pong_awaiting)) == 0) - goto unlock; - - atomic_set(&(rconn->source.in.pong_awaiting), 0); - nb->pong_conns_expected--; -unlock: - mutex_unlock(&(nb->conn_list_lock)); - } -} - static void ping_all_conns(struct neighbor *nb) { struct conn *rconn; @@ -141,7 +126,7 @@ static void parse_ack_conn(struct neighbor *nb, struct sk_buff *skb, __u32 seqno = pull_u32(skb, 1); __u8 window = pull_u8(skb); - pong_rcvd(rconn); + reset_ping(rconn); conn_ack_rcvd(kpacket_seqno, rconn->reversedir, seqno, window, 0, 0); } @@ -162,7 +147,7 @@ static void parse_ack_conn_ooo(struct neighbor *nb, struct sk_buff *skb, __u32 seqno_ooo = pull_u32(skb, 1); __u32 length = pull_u32(skb, 1); - pong_rcvd(rconn); + reset_ping(rconn); conn_ack_rcvd(kpacket_seqno, rconn->reversedir, seqno, window, seqno_ooo, length); @@ -187,6 +172,7 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb, struct conn *sconn = rconn->reversedir; __u32 conn_id = pull_u32(skb, 1); + __u32 init_seqno = pull_u32(skb, 1); __u8 window = pull_u8(skb); BUG_ON(sconn == 0); @@ -206,6 +192,10 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb, if (unlikely(atomic_read(&(sconn->isreset)) != 0)) goto reset; + sconn->target.out.seqno_nextsend = init_seqno; + sconn->target.out.seqno_acked = init_seqno; + reset_seqno(&(sconn->buf), init_seqno); + sconn->target.out.seqno_windowlimit = sconn->target.out.seqno_nextsend + dec_window(window); @@ -230,7 +220,6 @@ reset: static void parse_reset(struct neighbor *nb, struct sk_buff *skb, __u32 seqno, struct conn *rconn) { - #warning todo ping conn waiting? atomic_cmpxchg(&(rconn->reversedir->isreset), 0, 1); reset_conn(rconn); } @@ -305,6 +294,7 @@ static void parse_connect(struct neighbor *nb, struct sk_buff *skb) { struct conn *rconn; __u32 conn_id = pull_u32(skb, 1); + __u32 init_seqno = pull_u32(skb, 1); __u8 window = pull_u8(skb); struct control_msg_out *cm = alloc_control_msg(nb, ACM_PRIORITY_HIGH); @@ -319,16 +309,20 @@ static void parse_connect(struct neighbor *nb, struct sk_buff *skb) if (unlikely(conn_init_out(rconn->reversedir, nb))) goto err; - #warning kref??? - rconn->reversedir->target.out.conn_id = conn_id; + + rconn->reversedir->target.out.seqno_nextsend = init_seqno; + rconn->reversedir->target.out.seqno_acked = init_seqno; + reset_seqno(&(rconn->reversedir->buf), init_seqno); + rconn->reversedir->target.out.seqno_windowlimit = rconn->reversedir->target.out.seqno_nextsend + dec_window(window); insert_reverse_connid(rconn->reversedir); send_connect_success(cm, rconn->reversedir->target.out.conn_id, - rconn->source.in.conn_id, rconn); + rconn->source.in.conn_id, rconn->source.in.next_seqno, + rconn); if (0) { err: @@ -425,7 +419,6 @@ static void kernel_packet2(struct neighbor *nb, struct sk_buff *skb, if (conn != 0) { BUG_ON(conn->reversedir->sourcetype != SOURCE_IN); - pong_rcvd(conn->reversedir); atomic_cmpxchg(&(conn->isreset), 0, 1); reset_conn(conn); conn = 0; @@ -503,11 +496,11 @@ void kernel_packet(struct neighbor *nb, struct sk_buff *skb, __u32 seqno) goto discard; break; case KP_CONNECT: - if (cor_pull_skb(skb2, 5) == 0) + if (cor_pull_skb(skb2, 10) == 0) goto discard; break; case KP_CONNECT_SUCCESS: - if (cor_pull_skb(skb2, 9) == 0) + if (cor_pull_skb(skb2, 14) == 0) goto discard; break; case KP_CONN_DATA: diff --git a/net/cor/rcv.c b/net/cor/rcv.c index b21454761c7..1c5516818d0 100644 --- a/net/cor/rcv.c +++ b/net/cor/rcv.c @@ -261,14 +261,18 @@ static void _outofbufferspace(void) int i; + mutex_lock(&(conn->rcv_lock)); + BUG_ON(conn->sourcetype != SOURCE_IN); - usage = atomic_read(&(conn->source.in.usage_reserve)); + usage = conn->source.in.usage_reserve; spin_lock_irqsave(&st_lock, iflags); speed = get_speed(&(conn->source.in.st), jiffies); spin_unlock_irqrestore(&st_lock, iflags); + mutex_unlock(&(conn->rcv_lock)); + if (offendingconns[OUTOFBUFFERSPACE_OFFENDERS-1] != 0 && compare_scores( offendingusage[OUTOFBUFFERSPACE_OFFENDERS-1], @@ -366,7 +370,7 @@ static void refresh_bufferusage(struct conn *rconn) bufferusage_init -= rconn->source.in.usage_init; bufferusage_speed -= rconn->source.in.usage_speed; bufferusage_ata -= rconn->source.in.usage_ata; - bufferusage_reserve -= atomic_read(&(rconn->source.in.usage_reserve)); + bufferusage_reserve -= rconn->source.in.usage_reserve; rconn->source.in.usage_ata = rconn->buf.totalsize; if (rconn->source.in.usage_ata > rconn->source.in.buffer_ata) @@ -397,18 +401,17 @@ static void refresh_bufferusage(struct conn *rconn) if ((rconn->source.in.usage_ata + rconn->source.in.usage_speed + rconn->source.in.usage_init) == rconn->buf.totalsize) - atomic_set(&(rconn->source.in.usage_reserve), 0); + rconn->source.in.usage_reserve = 0; else - atomic_set(&(rconn->source.in.usage_reserve), - rconn->buf.totalsize - + rconn->source.in.usage_reserve = rconn->buf.totalsize - rconn->source.in.usage_ata - rconn->source.in.usage_speed - - rconn->source.in.usage_init); + rconn->source.in.usage_init; bufferusage_init += rconn->source.in.usage_init; bufferusage_speed += rconn->source.in.usage_speed; bufferusage_ata += rconn->source.in.usage_ata; - bufferusage_reserve += atomic_read(&(rconn->source.in.usage_reserve)); + bufferusage_reserve += rconn->source.in.usage_reserve; if (bufferusage_reserve > BUFFERSPACE_RESERVE) { unsigned long iflags; @@ -426,7 +429,7 @@ static __u32 __get_window(struct conn *rconn) { __u64 window = 0; - if (atomic_read(&(rconn->source.in.usage_reserve)) != 0) + if (rconn->source.in.usage_reserve != 0) return 0; BUG_ON(rconn->source.in.usage_init > rconn->source.in.buffer_init); diff --git a/net/cor/settings.h b/net/cor/settings.h index f10b1a01315..b05c78d6a16 100644 --- a/net/cor/settings.h +++ b/net/cor/settings.h @@ -11,6 +11,7 @@ #define GUARANTEED_CMSGS_PER_NEIGH 16 #define MAX_URGENT_CMSGS_PER_NEIGH 8 +#define MAX_URGENT_CMSGS_PER_NEIGH_RETRANSALLOW 6 #define CMSG_INTERVAL_MS 100 diff --git a/net/cor/snd.c b/net/cor/snd.c index d943e8cbb6f..cfb8b9c60f4 100644 --- a/net/cor/snd.c +++ b/net/cor/snd.c @@ -713,7 +713,7 @@ void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno, mutex_lock(&(rconn->rcv_lock)); - if (unlikely(seqno - rconn->target.out.seqno_nextsend > 0)) + if (unlikely(((__s32) (seqno - rconn->target.out.seqno_nextsend)) > 0)) goto out; spin_lock_irqsave( &(nb->retrans_lock), iflags ); @@ -750,13 +750,14 @@ void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno, struct conn_retrans *cr = container_of( rconn->target.out.retrans_list.next, struct conn_retrans, conn_list); - if (unlikely(cr->seqno - rconn->target.out.seqno_acked > 0)) { + if (unlikely(((__s32) (cr->seqno - + rconn->target.out.seqno_acked)) > 0)) { rconn->target.out.seqno_acked = cr->seqno; } } in_order: - if (likely(seqno - rconn->target.out.seqno_acked > 0)) { + if (likely(((__s32) (seqno - rconn->target.out.seqno_acked)) > 0)) { rconn->target.out.seqno_acked = seqno; setwindow = 1; } -- 2.11.4.GIT