From 0dd215820c16d88dc58e1b62b97390ed9fe1f80f Mon Sep 17 00:00:00 2001 From: Michael Blizek Date: Sat, 24 Jul 2010 13:35:15 +0200 Subject: [PATCH] qos queue --- net/cor/cor.h | 171 ++++++++++------ net/cor/forward.c | 128 +++++++----- net/cor/kpacket_gen.c | 316 ++++++++++++++++++---------- net/cor/kpacket_parse.c | 19 +- net/cor/neighbor.c | 511 +++++++++++++++++++++++++--------------------- net/cor/rcv.c | 88 ++++---- net/cor/snd.c | 533 ++++++++++++++++++++++++++++++++++++++---------- 7 files changed, 1144 insertions(+), 622 deletions(-) diff --git a/net/cor/cor.h b/net/cor/cor.h index 1c8d87b3f30..9e5d7fd4d85 100644 --- a/net/cor/cor.h +++ b/net/cor/cor.h @@ -44,7 +44,7 @@ #define SOCKADDRTYPE_PORT 1 struct cor_sockaddr { int type; - + union { __be64 port; } addr; @@ -68,12 +68,12 @@ struct cor_sockaddr { /* * KP_PING[1] cookie[4] * KP_PONG[1] cookie[4] respdelay[4] - * + * * This is needed to find out whether the other node is reachable. After a new * neighbor is seen, ping requests are sent and the neighbor is only reachable * after a few pongs are received. These requests are also used to find out * whether a neighber is gone. - * + * * respdelay: * The receiver of a ping may delay the sending of the pong e.g. to create * bigger kernel packets. The respdelay is the time in microseconds the packet @@ -85,16 +85,16 @@ struct cor_sockaddr { /* KP_ACK[1] seqno[4] */ #define KP_ACK 4 -/* +/* * KP_ACK_CONN[1] conn_id[4] seqno[4] window[1] * KP_ACK_CONN_OOO[1] conn_id[4] seqno[4] window[1] seqno_ooo[4] length[4] - * + * * conn_id is the conn_id we use if we sent something through this conn and * *not* the conn_id that the neighbor used to send us the data - * + * * seqno = the seqno which is expected in the next non-out-of-order packet * seqno_ooo, length = in case - * + * * window = amount of data which can be sent without receiving the next ack * packets with lower seqno do not overwrite the last window size * note: the other side may also reduce the window size @@ -106,7 +106,7 @@ struct cor_sockaddr { #define KP_ACK_CONN 5 #define KP_ACK_CONN_OOO 6 -/* +/* * NOTE on connection ids: * connection ids we send are used for the receive channel * connection ids we receive are used for the send channel @@ -129,9 +129,9 @@ struct cor_sockaddr { /* KP_CONN_DATA[1] conn_id[4] seqno[4] length[2] data[length] */ #define KP_CONN_DATA 9 -/* - * KP_PING_CONN[1] conn_id[4] - * +/* + * KP_PING_CONN[1] conn_id[4] + * * This is for querying the status of an open connection. The response is either * KP_ACK_CONN or CONNID_UNKNOWN */ @@ -184,12 +184,12 @@ struct cor_sockaddr { /* * CD_LIST_NEIGH sends CDR_BINDATA if the command was successful. The response * format is: - * + * * totalneighs[4] response_rows[4] * for every row: * numaddr[2] (addrtypelen[2] addrlen[2] addrtype[addrtypelen] addr[addrlen] * )[numaddr] - * + * * Neighbors have to be sorted by uptime, new neighbors first. This is so that * the routing daemon can easily find out whether there are new neighbors. It * only needs to send a query with offset 0. If the totalneighs stays the same @@ -201,7 +201,7 @@ struct cor_sockaddr { /* * CD_SET_(FORWARD|BACKWARD)_TIMEOUT[2] length[4] timeout_ms[4] - * + * * If there is no successful communication with the previous or neighbor for * this period, the connection will be reset. This value must be between * NB_STALL_TIME and NB_KILL_TIME. Otherwise it will silently behave as if it @@ -222,7 +222,7 @@ struct cor_sockaddr { */ #define CDR_EXECOK 32768 #define CDR_EXECOK_OK 33024 - + #define CDR_EXECFAILED 32769 #define CDR_EXECFAILED_UNKNOWN_COMMAND 33280 @@ -260,13 +260,31 @@ struct htable{ __u32 htable_size; __u32 cell_size; __u32 num_elements; - + int (*matches)(void *htentry, void *searcheditem); __u32 key_offset; __u32 entry_offset; __u32 kref_offset; }; +struct resume_block{ + struct list_head lh; + int in_queue; +}; + +struct announce_data{ + struct kref ref; + + struct list_head lh; + struct net_device *dev; + struct delayed_work announce_work; + struct announce *ann; + struct resume_block rb; + + __u32 curr_announce_msg_offset; + __u64 scheduled_announce_timer; +}; + struct ping_cookie{ unsigned long time; __u32 cookie; @@ -280,12 +298,12 @@ struct ping_cookie{ struct neighbor{ struct list_head nb_list; - + struct kref ref; struct net_device *dev; char mac[MAX_ADDR_LEN]; - + char *addr; __u16 addrlen; @@ -301,14 +319,21 @@ struct neighbor{ __u64 timeout; __u32 cmlength; __u32 ucmlength; - - __u8 ping_all_conns; - + atomic_t cmcnt; /* size of queue + retransmits */ atomic_t ucmcnt; /* size of queue only */ - + + __u8 ping_all_conns; __u8 max_cmsg_delay_sent; - + + /* see snd.c/qos_queue */ + /* protected by cmsg_lock */ + __u8 kp_allmsgs; + + /* procected by queues_lock */ + struct resume_block rb_kp; + struct resume_block rb_cr; + struct mutex pingcookie_lock; unsigned long last_ping_time; __u32 ping_intransit; @@ -316,7 +341,7 @@ struct neighbor{ __u32 lastcookie; atomic_t latency; /* microsecs */ atomic_t max_remote_cmsg_delay; /* microsecs */ - + spinlock_t state_lock; union { __u64 last_state_change;/* initial state */ @@ -329,11 +354,11 @@ struct neighbor{ }state_time; __u8 state; __u16 ping_success; - + struct delayed_work stalltimeout_timer; __u8 str_timer_pending; - - + + atomic_t kpacket_seqno; atomic_t ooo_packets; @@ -345,8 +370,8 @@ struct neighbor{ struct list_head rcv_conn_list; struct list_head snd_conn_list; __u32 num_send_conns; - - /* + + /* * used for ping_all conns, if not zero this is the next conn we need to * ping, protected by conn_list_lock */ @@ -384,17 +409,17 @@ struct cor_sched_data{ struct data_buf_item{ struct list_head buf_list; - + union { struct { char *buf; __u32 datalen; - + }buf; - + struct sk_buff *skb; }data; - + __u8 type; }; @@ -402,12 +427,12 @@ struct data_buf{ struct list_head items; struct data_buf_item *lastread; __u32 first_offset; - + __u32 totalsize; __u32 read_remaining; - + __u16 last_read_offset; - + __u16 last_buflen; }; @@ -436,7 +461,7 @@ struct connlistener { int queue_len; struct list_head conn_queue; wait_queue_head_t wait; - + }; /* @@ -449,8 +474,8 @@ struct connlistener { struct conn{ /* The first member has to be the same as in connlistener (see sock.c)*/ - __u8 sockstate; - + __u8 sockstate; + #define SOURCE_NONE 0 #define SOURCE_IN 1 #define SOURCE_SOCK 2 @@ -461,9 +486,9 @@ struct conn{ __u8 sourcetype:4, targettype:4; - + __u8 qdisc_active; - + /* * isreset values: * 0... connection active @@ -474,32 +499,26 @@ struct conn{ * remaining except from this conn */ atomic_t isreset; - + struct list_head queue_list; - + struct kref ref; - - struct mutex rcv_lock; - /* state */ - __u32 credits; - /* credit rate */ - __s32 sender_crate; - __s32 resp_crate; + struct mutex rcv_lock; union{ struct{ struct neighbor *nb; /* list of all connections from this neighbor */ struct list_head nb_list; - + struct sk_buff_head reorder_queue; - + struct htab_entry htab_entry; __u32 conn_id; __u32 next_seqno; __u32 ooo_packets; - + atomic_t pong_awaiting; }in; @@ -517,7 +536,7 @@ struct conn{ __u32 cmdread; __u16 cmd; __u8 *cmdparams; - + __u32 stall_timeout_ms; }unconnected; @@ -530,7 +549,7 @@ struct conn{ struct list_head nb_list; /* protected by nb->retrans_lock, sorted by seqno */ struct list_head retrans_list; - + /* reverse conn_id lookup */ struct htab_entry htab_entry; @@ -539,15 +558,23 @@ struct conn{ __u32 seqno_acked; __u32 seqno_windowlimit; __u32 kp_windowsetseqno; - + + struct resume_block rb; + __u32 stall_timeout_ms; + + /* state */ + __u32 credits; + /* credit rate */ + __s32 sender_crate; + __s32 resp_crate; }out; struct{ wait_queue_head_t wait; }sock; }target; - + struct data_buf buf; struct conn *reversedir; @@ -563,7 +590,7 @@ struct skb_procstate{ struct{ __u32 offset; }announce; - + struct{ __u32 seqno; }rcv2; @@ -631,18 +658,24 @@ extern void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay); extern __u32 add_ping_req(struct neighbor *nb); +extern void unadd_ping_req(struct neighbor *nb, __u32 cookie); + extern int time_to_send_ping(struct neighbor *nb); extern int force_ping(struct neighbor *nb); extern void rcv_announce(struct sk_buff *skb); +extern int send_announce_qos(struct announce_data *ann); + +extern void announce_data_free(struct kref *ref); + extern int __init cor_neighbor_init(void); /* rcv.c */ extern void drain_ooo_queue(struct conn *rconn); -extern void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, +extern void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno); extern int __init cor_rcv_init(void); @@ -668,6 +701,8 @@ extern void retransmit_timerfunc(struct work_struct *work); extern void kern_ack_rcvd(struct neighbor *nb, __u32 seqno); +extern int resume_send_messages(struct neighbor *nb); + extern void send_pong(struct neighbor *nb, __u32 cookie); @@ -702,13 +737,27 @@ extern void cor_kgen_init(void); extern void parse(struct conn *rconn); /* snd.c */ +extern int destroy_queue(struct net_device *dev); + +extern int create_queue(struct net_device *dev); + +#define QOS_CALLER_KPACKET 0 +#define QOS_CALLER_CONN_RETRANS 1 +#define QOS_CALLER_ANNOUNCE 2 +#define QOS_CALLER_CONN 3 + +extern void qos_enqueue(struct net_device *dev, struct resume_block *rb, + int caller); + +extern void qos_enqueue_kpacket(struct neighbor *nb); + extern struct sk_buff *create_packet(struct neighbor *nb, int size, gfp_t alloc_flags, __u32 conn_id, __u32 seqno); extern void cancel_retrans(struct conn *rconn); extern void retransmit_conn_timerfunc(struct work_struct *work); - + extern void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno, __u8 window, __u32 seqno_ooo, __u32 length); @@ -721,6 +770,8 @@ extern void databuf_pull(struct data_buf *data, char *dst, int len); extern size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg); +extern void databuf_unpull(struct data_buf *data, __u32 bytes); + extern void databuf_pullold(struct data_buf *data, __u32 startpos, char *dst, int len); diff --git a/net/cor/forward.c b/net/cor/forward.c index 758a6b5ff96..1a96ad18b62 100644 --- a/net/cor/forward.c +++ b/net/cor/forward.c @@ -71,7 +71,7 @@ static void databuf_nextreadchunk(struct data_buf *data) } else if (&(data->lastread->buf_list) != data->items.prev) { data->lastread = container_of(data->lastread->buf_list.next, struct data_buf_item, buf_list); - + data->last_read_offset = 0; } } @@ -88,15 +88,15 @@ int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf) while(len > 0) { int rc = 0; int cpy = len; - + char *srcbuf = 0; int srcbuflen = 0; - + char *srcbufcpystart = 0; int srcbufcpylen = 0; - + BUG_ON(data->lastread == 0); - + if (data->lastread->type == TYPE_BUF) { srcbuf = data->lastread->data.buf.buf; srcbuflen = data->lastread->data.buf.datalen; @@ -112,7 +112,7 @@ int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf) if (cpy > srcbufcpylen) cpy = srcbufcpylen; - + if (userbuf) { int notcopied = copy_to_user(dst, srcbufcpystart, cpy); cpy -= notcopied; @@ -121,17 +121,17 @@ int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf) } else { memcpy(dst, srcbufcpystart, cpy); } - + dst += cpy; len -= cpy; totalcpy += cpy; - + data->read_remaining -= cpy; data->last_read_offset += cpy; - + if (cpy == srcbufcpylen) databuf_nextreadchunk(data); - + if (unlikely(rc < 0)) { if (totalcpy == 0) totalcpy = rc; @@ -152,20 +152,20 @@ size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg) size_t copied = 0; int iovidx = 0; int iovread = 0; - + while (iovidx < msg->msg_iovlen) { int rc; - + struct iovec *iov = msg->msg_iov + iovidx; __user char *msg = iov->iov_base + iovread; unsigned int len = iov->iov_len - iovread; - + if (len == 0) { iovidx++; iovread = 0; continue; } - + if (sconn->buf.read_remaining == 0) { if (sconn->sourcetype == SOURCE_NONE) rc = -EPIPE; @@ -174,25 +174,41 @@ size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg) } else { if (len > sconn->buf.read_remaining) len = sconn->buf.read_remaining; - + rc = _databuf_pull(&(sconn->buf), msg, len, 1); } - + BUG_ON(rc == 0); - + if (rc < 0) { if (copied == 0) copied = rc; break; } - + copied += rc; iovread += rc; } - + return copied; } +void databuf_unpull(struct data_buf *data, __u32 bytes) +{ + data->read_remaining += bytes; + + BUG_ON(data->lastread == 0); + + while (bytes > data->last_read_offset) { + bytes -= data->last_read_offset; + data->lastread = container_of(data->lastread->buf_list.prev, + struct data_buf_item, buf_list); + BUG_ON(&(data->lastread->buf_list) == &(data->items)); + } + + data->last_read_offset -= bytes; +} + void databuf_pullold(struct data_buf *data, __u32 startpos, char *dst, int len) { __u32 pos = data->first_offset; @@ -254,7 +270,7 @@ void databuf_pullold(struct data_buf *data, __u32 startpos, char *dst, int len) dst += cpy; len -= cpy; startpos += cpy; - + pos += srcbuflen; dbi = container_of(dbi->buf_list.next, struct data_buf_item, buf_list); @@ -268,10 +284,10 @@ void databuf_ack(struct data_buf *buf, __u32 pos) struct data_buf_item *firstitem = container_of(buf->items.next, struct data_buf_item, buf_list); int firstlen = 0; - + if (firstitem == buf->lastread) break; - + if (firstitem->type == TYPE_BUF) { firstlen = firstitem->data.buf.datalen; } else if (firstitem->type == TYPE_SKB) { @@ -279,12 +295,12 @@ void databuf_ack(struct data_buf *buf, __u32 pos) } else { BUG(); } - + if (((__s32)(buf->first_offset + firstlen - pos)) > 0) break; - + buf->first_offset += firstlen; - + databuf_item_free(firstitem); } } @@ -294,10 +310,10 @@ void databuf_ackread(struct data_buf *buf) while (!list_empty(&(buf->items)) && buf->lastread != 0) { struct data_buf_item *firstitem = container_of(buf->items.next, struct data_buf_item, buf_list); - + if (firstitem == buf->lastread) break; - + if (firstitem->type == TYPE_BUF) { buf->first_offset += firstitem->data.buf.datalen; } else if (firstitem->type == TYPE_SKB) { @@ -305,7 +321,7 @@ void databuf_ackread(struct data_buf *buf) } else { BUG(); } - + databuf_item_free(firstitem); } } @@ -337,20 +353,20 @@ void flush_buf(struct conn *rconn) static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf) { struct data_buf_item *item = 0; - + int totalcpy = 0; - + BUG_ON(databuf_maypush(&(rconn->buf)) < len); - + if (list_empty(&(rconn->buf.items)) == 0) { struct list_head *last = rconn->buf.items.prev; item = container_of(last, struct data_buf_item, buf_list); - + if (item->type != TYPE_BUF || rconn->buf.last_buflen <= item->data.buf.datalen) item = 0; } - + while (len > 0) { int rc = 0; int cpy = len; @@ -366,8 +382,8 @@ static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf) memset(item, 0, sizeof(item)); item->type = TYPE_BUF; item->data.buf.buf = kmalloc(buflen, GFP_KERNEL); - - + + if (unlikely(item->data.buf.buf == 0)) { kmem_cache_free(data_buf_item_slab, item); rc = -ENOMEM; @@ -377,13 +393,13 @@ static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf) list_add_tail(&(item->buf_list), &(rconn->buf.items)); rconn->buf.last_buflen = buflen; } - + BUG_ON(item->type != TYPE_BUF); BUG_ON(rconn->buf.last_buflen <= item->data.buf.datalen); - + if (rconn->buf.last_buflen - item->data.buf.datalen < cpy) cpy = (rconn->buf.last_buflen - item->data.buf.datalen); - + if (userbuf) { int notcopied = copy_from_user(item->data.buf.buf + item->data.buf.datalen, buf, cpy); @@ -394,13 +410,13 @@ static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf) memcpy(item->data.buf.buf + item->data.buf.datalen, buf, cpy); } - + buf += cpy; len -= cpy; totalcpy += cpy; - + item->data.buf.datalen += cpy; - + error: if (unlikely(rc < 0)) { if (totalcpy == 0) @@ -408,7 +424,7 @@ error: break; } } - + rconn->buf.read_remaining += totalcpy; return totalcpy; @@ -419,25 +435,25 @@ int receive_userbuf(struct conn *rconn, struct msghdr *msg) int copied = 0; int iovidx = 0; int iovread = 0; - + if (databuf_maypush(&(rconn->buf)) <= 0) return -EAGAIN; - + while (iovidx < msg->msg_iovlen) { struct iovec *iov = msg->msg_iov + iovidx; __user char *userbuf = iov->iov_base + iovread; int len = iov->iov_len - iovread; int rc; int pushlimit; - + if (len == 0) { iovidx++; iovread = 0; continue; } - + pushlimit = databuf_maypush(&(rconn->buf)); - + if (pushlimit <= 0) { if (rconn->targettype == TARGET_UNCONNECTED) rc = -EPIPE; @@ -449,7 +465,7 @@ int receive_userbuf(struct conn *rconn, struct msghdr *msg) rc = _receive_buf(rconn, userbuf, len, 1); } - + if (rc < 0) { if (copied == 0) copied = rc; @@ -459,10 +475,10 @@ int receive_userbuf(struct conn *rconn, struct msghdr *msg) copied += rc; iovread += rc; } - + if (copied > 0) flush_buf(rconn);; - + return copied; } @@ -476,23 +492,23 @@ void receive_buf(struct conn *rconn, char *buf, int len) int receive_skb(struct conn *rconn, struct sk_buff *skb) { struct data_buf_item *item; - + if (databuf_maypush(&(rconn->buf)) < skb->len) return 1; - + item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL); - + if (unlikely(item == 0)) return 1; - + item->data.skb = skb; item->type = TYPE_SKB; list_add_tail(&(item->buf_list), &(rconn->buf.items)); rconn->buf.read_remaining += skb->len; rconn->buf.last_buflen = 0; - + flush_buf(rconn); - + return 0; } diff --git a/net/cor/kpacket_gen.c b/net/cor/kpacket_gen.c index 917a34d7d70..bd24e0f0816 100644 --- a/net/cor/kpacket_gen.c +++ b/net/cor/kpacket_gen.c @@ -39,26 +39,26 @@ struct control_msg_out{ struct list_head lh; /* either neighbor or control_retrans_packet */ struct neighbor *nb; - + __u32 length; - + __u8 type; union{ struct{ __u32 cookie; unsigned long time_enqueued; /* jiffies */ }pong; - + struct{ __u32 seqno; }ack; - + struct{ __u32 conn_id; __u32 seqno; __u8 window; }ack_conn; - + struct{ __u32 conn_id; __u32 seqno; @@ -70,7 +70,7 @@ struct control_msg_out{ struct{ __u32 conn_id; }connect; - + struct{ __u32 rcvd_conn_id; __u32 gen_conn_id; @@ -79,7 +79,7 @@ struct control_msg_out{ struct{ __u32 conn_id; }reset; - + struct{ __u32 conn_id; __u32 seqno; @@ -87,15 +87,15 @@ struct control_msg_out{ char *data; __u32 datalen; }conn_data; - + struct{ __u32 conn_id; }ping_conn; - + struct{ __u32 conn_id; }connid_unknown; - + struct{ __u32 delay; }set_max_cmsg_delay; @@ -104,14 +104,14 @@ struct control_msg_out{ struct control_retrans { struct kref ref; - + struct neighbor *nb; __u32 seqno; - + unsigned long timeout; - + struct list_head msgs; - + struct htab_entry htab_entry; struct list_head timeout_list; }; @@ -161,17 +161,18 @@ static struct control_msg_out *_alloc_control_msg(struct neighbor *nb, struct control_msg_out *cm = 0; BUG_ON(nb == 0); - + if (urgent == 0) { - long packets1; - long packets2; - packets1 = atomic_inc_return(&(nb->cmcnt)); + long packets1 = atomic_inc_return(&(nb->cmcnt)); + long packets2 = atomic_inc_return(&(cmcnt)); + BUG_ON(packets1 <= 0); + BUG_ON(packets2 <= 0); + if (packets1 <= calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority)) goto alloc; - packets2 = atomic_inc_return(&(cmcnt)); - BUG_ON(packets2 <= 0); + if (unlikely(unlikely(packets2 > calc_limit(MAX_CMSGS_PER_NEIGH, priority)) || unlikely(packets1 > ( calc_limit(MAX_CMSGS_PER_NEIGH, priority) * @@ -206,7 +207,7 @@ void free_control_msg(struct control_msg_out *cm) atomic_dec(&(cm->nb->cmcnt)); atomic_dec(&(cmcnt)); } - + kmem_cache_free(controlmsg_slab, cm); } @@ -248,7 +249,7 @@ void retransmit_timerfunc(struct work_struct *work) struct neighbor *nb = container_of(to_delayed_work(work), struct neighbor, retrans_timer); - + int nbstate; int nbput = 0; @@ -267,12 +268,12 @@ void retransmit_timerfunc(struct work_struct *work) nbput = 1; break; } - + cr = container_of(nb->retrans_list.next, struct control_retrans, timeout_list); - + BUG_ON(cr->nb != nb); - + rm.seqno = cr->seqno; rm.nb = nb; @@ -319,20 +320,20 @@ void retransmit_timerfunc(struct work_struct *work) static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb) { unsigned long iflags; - + struct retransmit_matchparam rm; int first; rm.seqno = cr->seqno; rm.nb = nb; - + set_retrans_timeout(cr, nb); - + spin_lock_irqsave( &(nb->retrans_lock), iflags ); htable_insert(&retransmits, (char *) cr, rm_to_key(&rm)); first = list_empty(&(nb->retrans_list)); list_add_tail(&(cr->timeout_list), &(nb->retrans_list)); - + if (first && nb->retrans_timer_running == 0) { schedule_delayed_work(&(nb->retrans_timer), cr->timeout - jiffies); @@ -395,7 +396,7 @@ static int add_ack(struct sk_buff *skb, struct control_retrans *cr, dst = skb_put(skb, 5); BUG_ON(dst == 0); - + dst[0] = KP_ACK; put_u32(dst + 1, cm->msg.ack.seqno, 1); @@ -415,7 +416,7 @@ static int add_ack_conn(struct sk_buff *skb, struct control_retrans *cr, dst = skb_put(skb, 10); BUG_ON(dst == 0); - + dst[0] = KP_ACK_CONN; put_u32(dst + 1, cm->msg.ack_conn.conn_id, 1); put_u32(dst + 5, cm->msg.ack_conn.seqno, 1); @@ -436,7 +437,7 @@ static int add_ack_conn_ooo(struct sk_buff *skb, struct control_retrans *cr, dst = skb_put(skb, 18); BUG_ON(dst == 0); - + dst[0] = KP_ACK_CONN_OOO; put_u32(dst + 1, cm->msg.ack_conn_ooo.conn_id, 1); put_u32(dst + 5, cm->msg.ack_conn_ooo.seqno, 1); @@ -476,7 +477,7 @@ static int add_pong(struct sk_buff *skb, struct control_retrans *cr, dst = skb_put(skb, 9); BUG_ON(dst == 0); - + dst[0] = KP_PONG; put_u32(dst + 1, cm->msg.pong.cookie, 0); put_u32(dst + 5, 1000 * jiffies_to_msecs(jiffies - @@ -493,13 +494,13 @@ static int add_connect(struct sk_buff *skb, struct control_retrans *cr, struct control_msg_out *cm, int spaceleft) { char *dst; - + if (unlikely(spaceleft < 5)) return 0; - + dst = skb_put(skb, 5); BUG_ON(dst == 0); - + dst[0] = KP_CONNECT; put_u32(dst + 1, cm->msg.connect.conn_id, 1); @@ -518,7 +519,7 @@ static int add_connect_success(struct sk_buff *skb, struct control_retrans *cr, dst = skb_put(skb, 9); BUG_ON(dst == 0); - + dst[0] = KP_CONNECT_SUCCESS; put_u32(dst + 1, cm->msg.connect_success.rcvd_conn_id, 1); put_u32(dst + 5, cm->msg.connect_success.gen_conn_id, 1); @@ -547,15 +548,19 @@ static int add_reset_conn(struct sk_buff *skb, struct control_retrans *cr, return 5; } -static int add_conndata(struct sk_buff *skb, struct control_msg_out *cm, - int spaceleft) +static int add_conndata(struct sk_buff *skb, struct control_retrans *cr, + struct control_msg_out *cm, int spaceleft, + struct control_msg_out **split_conndata, __u32 *sc_sendlen) { char *dst; - + int totallen = cm->msg.conn_data.datalen + 11; int putlen = min(totallen, spaceleft); int dataputlen = putlen - 11; + BUG_ON(split_conndata == 0); + BUG_ON(sc_sendlen == 0); + if (dataputlen < 1 || (spaceleft < 25 && spaceleft < totallen)) return 0; @@ -566,23 +571,16 @@ static int add_conndata(struct sk_buff *skb, struct control_msg_out *cm, put_u32(dst + 1, cm->msg.conn_data.conn_id, 1); put_u32(dst + 5, cm->msg.conn_data.seqno, 1); put_u16(dst + 9, dataputlen, 1); - + memcpy(dst + 11, cm->msg.conn_data.data, dataputlen); - - cm->msg.conn_data.datalen -= dataputlen; - cm->msg.conn_data.data += dataputlen; - - if (cm->msg.conn_data.datalen == 0) { - kfree(cm->msg.conn_data.data_orig); - free_control_msg(cm); + + if (cm->msg.conn_data.datalen == dataputlen) { + list_add_tail(&(cm->lh), &(cr->msgs)); } else { - send_conndata(cm, cm->msg.conn_data.conn_id, - cm->msg.conn_data.seqno, - cm->msg.conn_data.data_orig, - cm->msg.conn_data.data, - cm->msg.conn_data.datalen); + *split_conndata = cm; + *sc_sendlen = dataputlen; } - + return putlen; } @@ -599,7 +597,7 @@ static int add_ping_conn(struct sk_buff *skb, struct control_retrans *cr, dst[0] = KP_PING_CONN; put_u32(dst + 1, cm->msg.ping_conn.conn_id, 1); - + list_add_tail(&(cm->lh), &(cr->msgs)); return 5; @@ -662,8 +660,12 @@ static int add_set_max_cmsg_dly(struct sk_buff *skb, struct control_retrans *cr, } static int add_message(struct sk_buff *skb, struct control_retrans *cr, - struct control_msg_out *cm, int spaceleft) + struct control_msg_out *cm, int spaceleft, + struct control_msg_out **split_conndata, __u32 *sc_sendlen) { + BUG_ON(split_conndata != 0 && *split_conndata != 0); + BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0); + switch (cm->type) { case MSGTYPE_ACK: return add_ack(skb, cr, cm, spaceleft); @@ -680,7 +682,8 @@ static int add_message(struct sk_buff *skb, struct control_retrans *cr, case MSGTYPE_RESET_CONN: return add_reset_conn(skb, cr, cm, spaceleft); case MSGTYPE_CONNDATA: - return add_conndata(skb, cm, spaceleft); + return add_conndata(skb, cr, cm, spaceleft, split_conndata, + sc_sendlen); case MSGTYPE_PING_CONN: return add_ping_conn(skb, cr, cm, spaceleft); case MSGTYPE_CONNID_UNKNOWN: @@ -718,13 +721,13 @@ static __u32 __send_messages_pc(struct neighbor *nb, struct sk_buff *skb, struct list_head *next; struct control_msg_out *cm; int rc; - + rconn = nb->next_ping_conn; sconn = rconn->reversedir; - + BUG_ON(rconn->targettype != TARGET_OUT); BUG_ON(sconn->sourcetype != SOURCE_IN); - + if (unlikely(rconn->target.out.conn_id)) goto next; @@ -749,7 +752,7 @@ static __u32 __send_messages_pc(struct neighbor *nb, struct sk_buff *skb, cm->length = 5; cm->type = MSGTYPE_PING_CONN; cm->msg.ping_conn.conn_id = rconn->target.out.conn_id; - rc = add_message(skb, cr, cm, spaceleft - length); + rc = add_message(skb, cr, cm, spaceleft - length, 0, 0); if (rc == 0) break; @@ -771,33 +774,35 @@ next: } static __u32 __send_messages(struct neighbor *nb, struct sk_buff *skb, - struct control_retrans *cr, int spaceleft, int urgentonly) + struct control_retrans *cr, int spaceleft, int urgentonly, + struct control_msg_out **split_conndata, __u32 *sc_sendlen) { __u32 length = 0; while (!list_empty(&(nb->ucontrol_msgs_out)) || (!urgentonly && !list_empty(&(nb->control_msgs_out)))) { int rc; - + int urgent = !list_empty(&(nb->ucontrol_msgs_out)); - + struct control_msg_out *cm; - + if (urgent) cm = container_of(nb->ucontrol_msgs_out.next, struct control_msg_out, lh); - else + else cm = container_of(nb->control_msgs_out.next, struct control_msg_out, lh); - + list_del(&(cm->lh)); if (urgent) nb->ucmlength -= cm->length; else nb->cmlength -= cm->length; mutex_unlock(&(nb->cmsg_lock)); - rc = add_message(skb, cr, cm, spaceleft - length); + rc = add_message(skb, cr, cm, spaceleft - length, + split_conndata, sc_sendlen); mutex_lock(&(nb->cmsg_lock)); - + if (rc == 0) { if (urgent) { list_add(&(cm->lh), &(nb->ucontrol_msgs_out)); @@ -808,10 +813,10 @@ static __u32 __send_messages(struct neighbor *nb, struct sk_buff *skb, } break; } - + length += rc; } - + return length; } @@ -832,7 +837,7 @@ static int ping_all_conns_needed(struct neighbor *nb) curr = curr->next; } - + return 1; } @@ -851,7 +856,7 @@ static int __send_messages_smcd(struct neighbor *nb, struct sk_buff *skb, cm->msg.set_max_cmsg_delay.delay = CMSG_INTERVAL_MS * 10; cm->length = 5; - rc = add_message(skb, cr, cm, spaceleft); + rc = add_message(skb, cr, cm, spaceleft, 0, 0); nb->max_cmsg_delay_sent = 1; return rc; @@ -871,23 +876,28 @@ static int __send_messages_pac(struct neighbor *nb, struct sk_buff *skb, cm->type = MSGTYPE_PING_ALL_CONNS; cm->length = 1; - rc = add_message(skb, cr, cm, spaceleft); + rc = add_message(skb, cr, cm, spaceleft, 0, 0); nb->ping_all_conns = 0; return rc; } -static void _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, +static int _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, struct control_retrans *cr, int spaceleft, int urgentonly) { + int rc; int length = 0; + __u32 pingcookie = 0; + struct control_msg_out *split_conndata = 0; + __u32 sc_sendlen = 0; mutex_lock(&(nb->cmsg_lock)); if (ping != 0) { - __u32 pingcookie = add_ping_req(nb); - int rc = add_ping(skb, pingcookie, spaceleft - length); + int rc; + pingcookie = add_ping_req(nb); + rc = add_ping(skb, pingcookie, spaceleft - length); BUG_ON(rc == 0); length += rc; } @@ -897,8 +907,9 @@ static void _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, if (likely(urgentonly == 0) && unlikely(nb->max_cmsg_delay_sent == 0)) length += __send_messages_smcd(nb, skb, cr, spaceleft - length); - - length += __send_messages(nb, skb, cr, spaceleft - length, urgentonly); + + length += __send_messages(nb, skb, cr, spaceleft - length, urgentonly, + &split_conndata, &sc_sendlen); if (likely(urgentonly == 0)) length += __send_messages_pc(nb, skb, cr, spaceleft - length); @@ -910,12 +921,63 @@ static void _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, padding(skb, spaceleft - length); - if (list_empty(&(cr->msgs))) + rc = dev_queue_xmit(skb); + + if (rc != 0) { + unadd_ping_req(nb, pingcookie); + + while (list_empty(&(cr->msgs)) == 0) { + struct control_msg_out *cm = container_of(cr->msgs.prev, + struct control_msg_out, lh); + list_del(&(cm->lh)); + add_control_msg(cm, 1); + } + + if (split_conndata != 0) { + add_control_msg(split_conndata, 1); + } + kref_put(&(cr->ref), free_control_retrans); - else - schedule_retransmit(cr, nb); + } else { + struct list_head *curr = cr->msgs.next; + + while(curr != &(cr->msgs)) { + struct control_msg_out *cm = container_of(curr, + struct control_msg_out, lh); + + curr = curr->next; + + if (cm->type == MSGTYPE_CONNDATA) { + list_del(&(cm->lh)); + kfree(cm->msg.conn_data.data_orig); + free_control_msg(cm); + } + } + + if (split_conndata != 0) { + BUG_ON(sc_sendlen == 0); + BUG_ON(sc_sendlen >= + split_conndata->msg.conn_data.datalen); + + split_conndata->msg.conn_data.data += sc_sendlen; + split_conndata->msg.conn_data.datalen -= sc_sendlen; + + send_conndata(split_conndata, + split_conndata->msg.conn_data.conn_id, + split_conndata->msg.conn_data.seqno, + split_conndata->msg.conn_data.data_orig, + split_conndata->msg.conn_data.data, + split_conndata->msg.conn_data.datalen); + } + + + if (list_empty(&(cr->msgs))) + kref_put(&(cr->ref), free_control_retrans); + else + schedule_retransmit(cr, nb); + } - dev_queue_xmit(skb); + return rc; } static __u32 get_total_messages_length(struct neighbor *nb, int ping, @@ -954,8 +1016,9 @@ static __u32 get_total_messages_length(struct neighbor *nb, int ping, return length; } -static void send_messages(struct neighbor *nb, int allmsgs) +static int send_messages(struct neighbor *nb, int allmsgs, int resume) { + int rc = 0; int ping; int targetmss = mss(nb); @@ -964,15 +1027,18 @@ static void send_messages(struct neighbor *nb, int allmsgs) mutex_lock(&(nb->cmsg_lock)); + if (resume) + allmsgs = nb->kp_allmsgs; + ping = time_to_send_ping(nb); while (1) { __u32 length; - + __u32 seqno; struct sk_buff *skb; struct control_retrans *cr; - + BUG_ON(list_empty(&(nb->control_msgs_out)) && (nb->cmlength != 0)); BUG_ON((list_empty(&(nb->control_msgs_out)) == 0) && @@ -983,15 +1049,15 @@ static void send_messages(struct neighbor *nb, int allmsgs) (nb->ucmlength == 0)); BUG_ON(nb->cmlength < 0); BUG_ON(nb->ucmlength < 0); - + length = get_total_messages_length(nb, ping, urgentonly); - + if (length == 0) break; if (length < targetmss && allmsgs == 0) break; - + seqno = atomic_add_return(1, &(nb->kpacket_seqno)); if (length > targetmss) @@ -1002,7 +1068,7 @@ static void send_messages(struct neighbor *nb, int allmsgs) if (unlikely(skb == 0)) { printk(KERN_ERR "cor: send_messages: cannot allocate " "skb (out of memory?)"); - return; + goto oom; } cr = kmem_cache_alloc(controlretrans_slab, GFP_KERNEL); @@ -1010,7 +1076,7 @@ static void send_messages(struct neighbor *nb, int allmsgs) kfree_skb(skb); printk(KERN_ERR "cor: send_messages: cannot allocate " "control_retrans (out of memory?)"); - return; + goto oom; } memset(cr, 0, sizeof(struct control_retrans)); kref_init(&(cr->ref)); @@ -1018,13 +1084,40 @@ static void send_messages(struct neighbor *nb, int allmsgs) cr->seqno = seqno; INIT_LIST_HEAD(&(cr->msgs)); - _send_messages(nb, skb, ping, cr, length, urgentonly); + rc = _send_messages(nb, skb, ping, cr, length, urgentonly); ping = 0; mutex_lock(&(nb->cmsg_lock)); + + if (rc != 0) + break; + } + + if (0) { +oom: + mutex_lock(&(nb->cmsg_lock)); + } + + if (rc != 0) { + if (resume == 0) { + nb->kp_allmsgs = nb->kp_allmsgs || allmsgs; + qos_enqueue_kpacket(nb); + } + } else if (allmsgs) { + nb->kp_allmsgs = 0; } mutex_unlock(&(nb->cmsg_lock)); + + if (allmsgs) + schedule_controlmsg_timerfunc(nb); + + return rc; +} + +int resume_send_messages(struct neighbor *nb) +{ + return send_messages(nb, 0, 1); } static void controlmsg_timerfunc(struct work_struct *work) @@ -1034,18 +1127,17 @@ static void controlmsg_timerfunc(struct work_struct *work) __u64 jiffies = get_jiffies_64(); mutex_lock(&(nb->cmsg_lock)); - + if (nb->timeout > jiffies) { INIT_DELAYED_WORK(&(nb->cmsg_timer), controlmsg_timerfunc); schedule_delayed_work(&(nb->cmsg_timer), nb->timeout - jiffies); mutex_unlock(&(nb->cmsg_lock)); return; } - + mutex_unlock(&(nb->cmsg_lock)); - - send_messages(nb, 1); - schedule_controlmsg_timerfunc(nb); + + send_messages(nb, 1, 0); kref_put(&(nb->ref), neighbor_free); } @@ -1053,21 +1145,21 @@ void schedule_controlmsg_timerfunc(struct neighbor *nb) { __u64 jiffies = get_jiffies_64(); long long delay; - + int state = get_neigh_state(nb); - + if (unlikely(state == NEIGHBOR_STATE_KILLED)) return; - + mutex_lock(&(nb->cmsg_lock)); nb->timeout += msecs_to_jiffies(CMSG_INTERVAL_MS); - + delay = nb->timeout - jiffies; if (delay < 0) { - delay = 0; + delay = 1; nb->timeout = jiffies; } - + INIT_DELAYED_WORK(&(nb->cmsg_timer), controlmsg_timerfunc); schedule_delayed_work(&(nb->cmsg_timer), delay); mutex_unlock(&(nb->cmsg_lock)); @@ -1079,6 +1171,8 @@ static void free_oldest_ucm(struct neighbor *nb) struct control_msg_out *cm = container_of(nb->ucontrol_msgs_out.next, struct control_msg_out, lh); + BUG_ON(isurgent(cm) == 0); + list_del(&(cm->lh)); nb->ucmlength -= cm->length; atomic_dec(&(nb->ucmcnt)); @@ -1124,10 +1218,10 @@ static void add_control_msg(struct control_msg_out *cm, int retrans) cm->nb->cmlength += cm->length; list_add_tail(&(cm->lh), &(cm->nb->control_msgs_out)); } - + if (unlikely((nbstate == NEIGHBOR_STATE_ACTIVE ? cm->nb->cmlength : 0)+ cm->nb->ucmlength >= mss(cm->nb))) - send_messages(cm->nb, 0); + send_messages(cm->nb, 0, 0); out: mutex_unlock(&(cm->nb->cmsg_lock)); @@ -1136,10 +1230,10 @@ out: void send_pong(struct neighbor *nb, __u32 cookie) { struct control_msg_out *cm = _alloc_control_msg(nb, 0, 1); - + if (unlikely(cm == 0)) return; - + cm->nb = nb; cm->type = MSGTYPE_PONG; cm->msg.pong.cookie = cookie; @@ -1159,10 +1253,10 @@ void send_reset_conn(struct control_msg_out *cm, __u32 conn_id) void send_ack(struct neighbor *nb, __u32 seqno) { struct control_msg_out *cm = _alloc_control_msg(nb, 0, 1); - + if (unlikely(cm == 0)) return; - + cm->nb = nb; cm->type = MSGTYPE_ACK; cm->msg.ack.seqno = seqno; @@ -1177,7 +1271,7 @@ void send_ack_conn(struct control_msg_out *cm, __u32 conn_id, __u32 seqno, cm->msg.ack_conn.conn_id = conn_id; cm->msg.ack_conn.seqno = seqno; cm->msg.ack_conn.window = window; - + cm->length = 10; add_control_msg(cm, 0); } @@ -1255,7 +1349,7 @@ static int matches_connretrans(void *htentry, void *searcheditem) struct control_retrans *cr = (struct control_retrans *) htentry; struct retransmit_matchparam *rm = (struct retransmit_matchparam *) searcheditem; - + return rm->nb == cr->nb && rm->seqno == cr->seqno; } diff --git a/net/cor/kpacket_parse.c b/net/cor/kpacket_parse.c index 4591547488d..c59fbedd520 100644 --- a/net/cor/kpacket_parse.c +++ b/net/cor/kpacket_parse.c @@ -136,7 +136,7 @@ static void parse_ack_conn(struct neighbor *nb, struct sk_buff *skb, { __u32 seqno = pull_u32(skb, 1); __u8 window = pull_u8(skb); - + pong_rcvd(rconn); conn_ack_rcvd(kpacket_seqno, rconn->reversedir, seqno, window, 0, 0); @@ -157,7 +157,7 @@ static void parse_ack_conn_ooo(struct neighbor *nb, struct sk_buff *skb, __u8 window = pull_u8(skb); __u32 seqno_ooo = pull_u32(skb, 1); __u32 length = pull_u32(skb, 1); - + conn_ack_rcvd(kpacket_seqno, rconn->reversedir, seqno, window, seqno_ooo, length); } @@ -174,7 +174,7 @@ static void discard_conn_success(struct neighbor *nb, struct sk_buff *skb) } static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb, - __u32 seqno, struct conn *rconn) + __u32 seqno, struct conn *rconn) { struct conn *sconn = rconn->reversedir; @@ -194,10 +194,9 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb, if (likely(sconn->target.out.conn_id == 0)) { sconn->target.out.conn_id = conn_id; - if (unlikely(atomic_read(&(rconn->isreset)) != 0)) { + if (unlikely(atomic_read(&(sconn->isreset)) != 0)) goto reset; - } - + insert_reverse_connid(sconn); } @@ -226,10 +225,10 @@ static void parse_ping_conn(struct neighbor *nb, struct sk_buff *skb, __u32 seqno, struct conn *rconn) { struct control_msg_out *cm = alloc_control_msg(nb, ACM_PRIORITY_MED); - + if (unlikely(cm == 0)) return; - + mutex_lock(&(rconn->rcv_lock)); #warning todo set window send_ack_conn(cm, rconn->reversedir->target.out.conn_id, @@ -276,7 +275,7 @@ static void parse_conndata(struct neighbor *nb, struct sk_buff *skb) __u32 seqno = pull_u32(skb, 1); __u16 datalength = pull_u16(skb, 1); char *data = cor_pull_skb(skb, datalength); - + BUG_ON(data == 0); conn_rcv_buildskb(data, datalength, conn_id, seqno); @@ -433,7 +432,7 @@ void kernel_packet(struct neighbor *nb, struct sk_buff *skb, __u32 seqno) if (codeptr == 0) break; code = *codeptr; - + switch (code) { case KP_PADDING: break; diff --git a/net/cor/neighbor.c b/net/cor/neighbor.c index 8b8a50cb22d..208cef45667 100644 --- a/net/cor/neighbor.c +++ b/net/cor/neighbor.c @@ -36,14 +36,14 @@ * chunk 1 contains the checksum of the data in chunk 1 * chunk 2 contains the checksum of the data in chunk 1+2 * ... - * + * * Data format of the announce packet "data" field: * min_announce_proto_version [4] * max_announce_proto_version [4] * min_cor_proto_version [4] * max_cor_proto_version [4] * versions which are understood - * + * * command [4] * commandlength [4] * commanddata [commandlength] @@ -86,7 +86,7 @@ static int get_addrtype(__u32 addrtypelen, char *addrtype) (addrtype[0] == 'i' || addrtype[0] == 'I') && (addrtype[1] == 'd' || addrtype[1] == 'D')) return ADDRTYPE_ID; - + return ADDRTYPE_UNKNOWN; } @@ -109,12 +109,12 @@ static struct neighbor *alloc_neighbor(gfp_t allocflags) { struct neighbor *nb = kmem_cache_alloc(nb_slab, allocflags); __u32 seqno; - + if (unlikely(nb == 0)) return 0; - + memset(nb, 0, sizeof(struct neighbor)); - + kref_init(&(nb->ref)); mutex_init(&(nb->cmsg_lock)); INIT_LIST_HEAD(&(nb->control_msgs_out)); @@ -133,7 +133,7 @@ static struct neighbor *alloc_neighbor(gfp_t allocflags) spin_lock_init(&(nb->retrans_lock)); INIT_LIST_HEAD(&(nb->retrans_list)); INIT_LIST_HEAD(&(nb->retrans_list_conn)); - + return nb; } @@ -141,22 +141,22 @@ struct neighbor *get_neigh_by_mac(struct sk_buff *skb) { struct list_head *currlh; struct neighbor *ret = 0; - - + + char source_hw[MAX_ADDR_LEN]; memset(source_hw, 0, MAX_ADDR_LEN); if (skb->dev->header_ops != 0 && skb->dev->header_ops->parse != 0) skb->dev->header_ops->parse(skb, source_hw); - + mutex_lock(&(neighbor_operation_lock)); - + currlh = nb_list.next; - + while (currlh != &nb_list) { struct neighbor *curr = container_of(currlh, struct neighbor, nb_list); - + if (memcmp(curr->mac, source_hw, MAX_ADDR_LEN) == 0) { ret = curr; kref_get(&(ret->ref)); @@ -167,7 +167,7 @@ struct neighbor *get_neigh_by_mac(struct sk_buff *skb) } mutex_unlock(&(neighbor_operation_lock)); - + return ret; } @@ -176,79 +176,79 @@ struct neighbor *find_neigh(__u16 addrtypelen, __u8 *addrtype, { struct list_head *currlh; struct neighbor *ret = 0; - + if (get_addrtype(addrtypelen, addrtype) != ADDRTYPE_ID) return 0; - + mutex_lock(&(neighbor_operation_lock)); - + currlh = nb_list.next; - + while (currlh != &nb_list) { struct neighbor *curr = container_of(currlh, struct neighbor, nb_list); - + if (curr->addrlen == addrlen && memcmp(curr->addr, addr, addrlen) == 0) { ret = curr; kref_get(&(ret->ref)); - + goto out; } currlh = currlh->next; } - + out: mutex_unlock(&(neighbor_operation_lock)); - + return ret; } __u32 generate_neigh_list(char *buf, __u32 buflen, __u32 limit, __u32 offset) { struct list_head *currlh; - + char *p_totalneighs = buf; char *p_response_rows = buf + 4; - + int bufferfull = 0; - + __u32 total = 0; __u32 cnt = 0; - + __u32 buf_offset = 8; - + BUG_ON(buf == 0); BUG_ON(buflen < 8); - + mutex_lock(&(neighbor_operation_lock)); - + currlh = nb_list.next; - + while (currlh != &nb_list) { struct neighbor *curr = container_of(currlh, struct neighbor, nb_list); - + __u8 state; unsigned long iflags; /* get_neigh_state not used here because it would deadlock */ spin_lock_irqsave( &(curr->state_lock), iflags ); state = curr->state; spin_unlock_irqrestore( &(curr->state_lock), iflags ); - + if (state != NEIGHBOR_STATE_ACTIVE) goto cont2; - + if (total < offset) goto cont; - + if (unlikely(buflen - buf_offset - 6 - 2 - curr->addrlen < 0)) bufferfull = 1; - + if (bufferfull) goto cont; - + put_u16(buf + buf_offset, 1, 1);/* numaddr */ buf_offset += 2; put_u16(buf + buf_offset, 2, 1);/* addrtypelen */ @@ -263,7 +263,7 @@ __u32 generate_neigh_list(char *buf, __u32 buflen, __u32 limit, __u32 offset) buf_offset += curr->addrlen; BUG_ON(buf_offset > buflen); - + cnt++; cont: @@ -271,12 +271,12 @@ cont: cont2: currlh = currlh->next; } - + mutex_unlock(&(neighbor_operation_lock)); - + put_u32(p_totalneighs, total, 1); put_u32(p_response_rows, cnt, 1); - + return buf_offset; } @@ -287,11 +287,11 @@ void set_last_routdtrip(struct neighbor *nb, unsigned long time) BUG_ON(nb == 0); spin_lock_irqsave( &(nb->state_lock), iflags ); - + if(likely(nb->state == NEIGHBOR_STATE_ACTIVE) && time_after(time, nb->state_time.last_roundtrip)) nb->state_time.last_roundtrip = time; - + spin_unlock_irqrestore( &(nb->state_lock), iflags ); } @@ -303,12 +303,12 @@ static void reset_stall_conns(struct neighbor *nb, start: mutex_lock(&(nb->conn_list_lock)); currlh = nb->snd_conn_list.next; - + while (currlh != &(nb->snd_conn_list)) { struct conn *rconn = container_of(currlh, struct conn, target.out.nb_list); BUG_ON(rconn->targettype != TARGET_OUT); - + if (resetall || stall_time_ms >= rconn->target.out.stall_timeout_ms) { /** @@ -321,7 +321,7 @@ start: } currlh = currlh->next; } - + BUG_ON(list_empty(&(nb->snd_conn_list)) && nb->num_send_conns != 0); mutex_unlock(&(nb->conn_list_lock)); } @@ -334,14 +334,14 @@ static void stall_timer(struct neighbor *nb, int fromtimer) __u8 nbstate; int resetall; - + unsigned long iflags; spin_lock_irqsave( &(nb->state_lock), iflags ); stall_time_ms = jiffies_to_msecs(jiffies - nb->state_time.last_roundtrip); nbstate = nb->state; - + if (unlikely(nbstate != NEIGHBOR_STATE_STALLED)) nb->str_timer_pending = 0; spin_unlock_irqrestore( &(nb->state_lock), iflags ); @@ -352,22 +352,22 @@ static void stall_timer(struct neighbor *nb, int fromtimer) } resetall = (stall_time_ms > NB_KILL_TIME_MS); - + /*if(resetall) printk(KERN_ERR "reset_all");*/ - + reset_stall_conns(nb, stall_time_ms, resetall); - + if (resetall) { spin_lock_irqsave( &(nb->state_lock), iflags ); nb->state = NEIGHBOR_STATE_KILLED; spin_unlock_irqrestore( &(nb->state_lock), iflags ); - + list_del(&(nb->nb_list)); kref_put(&(nb->ref), neighbor_free); /* nb_list */ - + kref_put(&(nb->ref), neighbor_free); /* stall_timer */ - + } else { if (fromtimer == 0) { int pending; @@ -404,7 +404,7 @@ int get_neigh_state(struct neighbor *nb) BUG_ON(nb == 0); spin_lock_irqsave( &(nb->state_lock), iflags ); - + if (unlikely(likely(nb->state == NEIGHBOR_STATE_ACTIVE) && unlikely( time_after_eq(jiffies, nb->state_time.last_roundtrip + msecs_to_jiffies(NB_STALL_TIME_MS)) && @@ -412,23 +412,23 @@ int get_neigh_state(struct neighbor *nb) nb->state = NEIGHBOR_STATE_STALLED; switchedtostalled = 1; } - + ret = nb->state; - + spin_unlock_irqrestore( &(nb->state_lock), iflags ); - + if (unlikely(switchedtostalled)) { /*printk(KERN_ERR "switched to stalled");*/ stall_timer(nb, 0); } - + return ret; } static struct ping_cookie *find_cookie(struct neighbor *nb, __u32 cookie) { int i; - + for(i=0;icookies[i].cookie == cookie) return &(nb->cookies[i]); @@ -440,19 +440,19 @@ void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay) { struct ping_cookie *c; int i; - + unsigned long cookie_sendtime; __s64 newlatency; - + unsigned long iflags; - + mutex_lock(&(nb->pingcookie_lock)); - + c = find_cookie(nb, cookie); - + if (unlikely(c == 0)) goto out; - + cookie_sendtime = c->time; newlatency = ((((__s64) ((__u32)atomic_read(&(nb->latency)))) * 15 + @@ -461,12 +461,12 @@ void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay) newlatency = 0; if (unlikely(newlatency > (((__s64)256)*256*256*256 - 1))) newlatency = ((__s64)256)*256*256*256 - 1; - + atomic_set(&(nb->latency), (__u32) newlatency); c->cookie = 0; nb->ping_intransit--; - + for(i=0;icookies[i].cookie != 0 && time_before(nb->cookies[i].time, c->time)) { @@ -478,13 +478,13 @@ void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay) } } } - + spin_lock_irqsave( &(nb->state_lock), iflags ); - + if (unlikely(nb->state == NEIGHBOR_STATE_INITIAL || nb->state == NEIGHBOR_STATE_STALLED)) { nb->ping_success++; - + if (nb->state == NEIGHBOR_STATE_INITIAL) { __u64 jiffies64 = get_jiffies_64(); if (nb->state_time.last_state_change == 0) @@ -493,7 +493,7 @@ void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay) msecs_to_jiffies(INITIAL_TIME_MS))) goto out2; } - + if (nb->ping_success >= PING_SUCCESS_CNT) { /*if (nb->state == NEIGHBOR_STATE_INITIAL) printk(KERN_ERR "switched from initial to active"); @@ -510,7 +510,7 @@ void ping_resp(struct neighbor *nb, __u32 cookie, __u32 respdelay) out2: spin_unlock_irqrestore( &(nb->state_lock), iflags ); - + out: mutex_unlock(&(nb->pingcookie_lock)); } @@ -519,11 +519,11 @@ __u32 add_ping_req(struct neighbor *nb) { struct ping_cookie *c; __u32 i; - + __u32 cookie; - + mutex_lock(&(nb->pingcookie_lock)); - + for (i=0;icookies[i].cookie == 0) goto found; @@ -541,18 +541,38 @@ found: if (unlikely(nb->lastcookie == 0)) nb->lastcookie++; c->cookie = nb->lastcookie; - + nb->ping_intransit++; - + cookie = c->cookie; - + nb->last_ping_time = jiffies; - + mutex_unlock(&(nb->pingcookie_lock)); return cookie; } +void unadd_ping_req(struct neighbor *nb, __u32 cookie) +{ + int i; + + if (cookie == 0) + return; + + mutex_lock(&(nb->pingcookie_lock)); + + for (i=0;icookies[i].cookie == cookie) { + nb->cookies[i].cookie = 0; + nb->ping_intransit--; + break; + } + } + + mutex_unlock(&(nb->pingcookie_lock)); +} + static int neighbor_idle(struct neighbor *nb) { int ret; @@ -607,7 +627,7 @@ int time_to_send_ping(struct neighbor *nb) forcetime = PING_FORCETIME_ACTIVEIDLE_MS; else forcetime = PING_FORCETIME_ACTIVE_MS; - + if (jiffies_to_msecs(jiffies - nb->last_ping_time) >= forcetime) rc = 2; } @@ -620,26 +640,28 @@ int time_to_send_ping(struct neighbor *nb) static void add_neighbor(struct neighbor *nb) { struct list_head *currlh = nb_list.next; - + BUG_ON((nb->addr == 0) != (nb->addrlen == 0)); - + while (currlh != &nb_list) { struct neighbor *curr = container_of(currlh, struct neighbor, nb_list); - + if (curr->addrlen == nb->addrlen && memcmp(curr->addr, nb->addr, curr->addrlen) == 0) goto already_present; currlh = currlh->next; } + /* kref_get not needed here, because the caller leaves its ref to us */ printk(KERN_ERR "add_neigh"); + list_add_tail(&(nb->nb_list), &nb_list); schedule_controlmsg_timerfunc(nb); INIT_DELAYED_WORK(&(nb->retrans_timer), retransmit_timerfunc); INIT_DELAYED_WORK(&(nb->retrans_timer_conn), retransmit_conn_timerfunc); - + if (0) { already_present: kmem_cache_free(nb_slab, nb); @@ -671,37 +693,37 @@ static int apply_announce_addaddr(struct neighbor *nb, __u32 cmd, __u32 len, char *addrtype; __u16 addrlen; char *addr; - + BUG_ON((nb->addr == 0) != (nb->addrlen == 0)); - + if (nb->addr != 0) return 0; - + if (len < 4) return 0; addrtypelen = be16_to_cpu(*((__u16 *) cmddata)); cmddata += 2; len -= 2; - + if (len < 2) return 0; - + addrlen = be16_to_cpu(*((__u16 *) cmddata)); cmddata += 2; len -= 2; - + addrtype = cmddata; cmddata += addrtypelen; len -= addrtypelen; - + addr = cmddata; cmddata += addrlen; len -= addrlen; - + if (len < 0) return 0; - + if (get_addrtype(addrtypelen, addrtype) != ADDRTYPE_ID) return 0; @@ -729,31 +751,31 @@ static void apply_announce_cmds(char *msg, __u32 len, struct net_device *dev, char *source_hw) { struct neighbor *nb = alloc_neighbor(GFP_KERNEL); - + if (unlikely(nb == 0)) return; - + while (len >= 8) { __u32 cmd; __u32 cmdlen; - + cmd = be32_to_cpu(*((__u32 *) msg)); msg += 4; len -= 4; cmdlen = be32_to_cpu(*((__u32 *) msg)); msg += 4; len -= 4; - + BUG_ON(cmdlen > len); - + apply_announce_cmd(nb, cmd, cmdlen, msg); - + msg += cmdlen; len -= cmdlen; } - + BUG_ON(len != 0); - + memcpy(nb->mac, source_hw, MAX_ADDR_LEN); dev_hold(dev); @@ -766,22 +788,22 @@ static int check_announce_cmds(char *msg, __u32 len) while (len >= 8) { __u32 cmd; __u32 cmdlen; - + cmd = be32_to_cpu(*((__u32 *) msg)); msg += 4; len -= 4; cmdlen = be32_to_cpu(*((__u32 *) msg)); msg += 4; len -= 4; - + /* malformated packet */ if (unlikely(cmdlen > len)) return 1; - + msg += cmdlen; len -= cmdlen; } - + if (unlikely(len != 0)) return 1; @@ -795,7 +817,7 @@ static void parse_announce(char *msg, __u32 len, struct net_device *dev, __u32 max_announce_version; __u32 min_cor_version; __u32 max_cor_version; - + if (unlikely(len < 16)) return; @@ -811,7 +833,7 @@ static void parse_announce(char *msg, __u32 len, struct net_device *dev, max_cor_version = be32_to_cpu(*((__u32 *) msg)); msg += 4; len -= 4; - + if (min_announce_version != 0) return; if (min_cor_version != 0) @@ -898,36 +920,36 @@ free: static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) { struct skb_procstate *ps = skb_pstate(skb); - + __u32 offset = ps->funcstate.announce.offset; __u32 len = skb->len; - + __u32 curroffset = 0; __u32 prevoffset = 0; __u32 prevlen = 0; - + struct sk_buff *curr = ann->skbs.next; - + if (unlikely(len + offset > ann->total_size)) { /* invalid header */ kfree_skb(skb); return 0; } - + /** * Try to find the right place to insert in the sorted list. This * means to process the list until we find a skb which has a greater * offset, so we can insert before it to keep the sort order. However, * this is complicated by the fact that the new skb must not be inserted * between 2 skbs if there is no data missing in between. So the loop - * runs has to keep running until there is either a gap to insert or + * runs has to keep running until there is either a gap to insert or * we see that this data has already been received. */ while ((void *) curr != (void *) &(ann->skbs)) { struct skb_procstate *currps = skb_pstate(skb); - + curroffset = currps->funcstate.announce.offset; - + if (curroffset > offset && (prevoffset + prevlen) < curroffset) break; @@ -941,7 +963,7 @@ static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) return 0; } } - + /** * Calculate how much data was really received, by substracting * the bytes we already have. @@ -950,11 +972,11 @@ static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) len -= (prevoffset + prevlen) - offset; offset = prevoffset + prevlen; } - + if (unlikely((void *) curr != (void *) &(ann->skbs) && (offset + len) > curroffset)) len = curroffset - offset; - + ann->received_size += len; BUG_ON(ann->received_size > ann->total_size); __skb_queue_before(&(ann->skbs), curr, skb); @@ -974,29 +996,29 @@ void rcv_announce(struct sk_buff *skb) struct announce_in *curr = 0; struct announce_in *leastactive = 0; __u32 list_size = 0; - + __u32 announce_proto_version = pull_u32(skb, 1); __u32 packet_version = pull_u32(skb, 1); __u32 total_size = pull_u32(skb, 1); - + char source_hw[MAX_ADDR_LEN]; memset(source_hw, 0, MAX_ADDR_LEN); if (skb->dev->header_ops != 0 && skb->dev->header_ops->parse != 0) skb->dev->header_ops->parse(skb, source_hw); - + ps->funcstate.announce.offset = pull_u32(skb, 1); - + if (total_size > 8192) goto discard; - + mutex_lock(&(neighbor_operation_lock)); - + if (announce_proto_version != 0) goto discard; - + curr = (struct announce_in *) announce_list.next; - + while (((struct list_head *) curr) != &(announce_list)) { list_size++; if (curr->dev == skb->dev && @@ -1005,36 +1027,36 @@ void rcv_announce(struct sk_buff *skb) curr->packet_version == packet_version && curr->total_size == total_size) goto found; - + if (leastactive == 0 || curr->last_received_packet < leastactive->last_received_packet) leastactive = curr; - + curr = (struct announce_in *) curr->lh.next; } - + if (list_size >= 128) { BUG_ON(leastactive == 0); curr = leastactive; - + curr->last_received_packet = get_jiffies_64(); - + while (!skb_queue_empty(&(curr->skbs))) { struct sk_buff *skb2 = skb_dequeue(&(curr->skbs)); kfree_skb(skb2); } - + dev_put(curr->dev); } else { curr = kmem_cache_alloc(announce_in_slab, GFP_KERNEL); if (curr == 0) goto discard; - + skb_queue_head_init(&(curr->skbs)); list_add_tail((struct list_head *) curr, &announce_list); } - + curr->packet_version = packet_version; curr->total_size = total_size; curr->received_size = 0; @@ -1042,25 +1064,25 @@ void rcv_announce(struct sk_buff *skb) curr->dev = skb->dev; dev_hold(curr->dev); memcpy(curr->source_hw, source_hw, MAX_ADDR_LEN); - + found: if (_rcv_announce(skb, curr)) { list_del((struct list_head *) curr); dev_put(curr->dev); kmem_cache_free(announce_in_slab, curr); } - + if (0) { discard: kfree_skb(skb); } - + mutex_unlock(&(neighbor_operation_lock)); } -struct announce { +struct announce{ struct kref ref; - + __u32 packet_version; char *announce_msg; __u32 announce_msg_len; @@ -1068,20 +1090,7 @@ struct announce { struct announce *last_announce; -struct announce_data { - struct delayed_work announce_work; - - struct net_device *dev; - - struct announce *ann; - - struct list_head lh; - - __u32 curr_announce_msg_offset; - __u64 scheduled_announce_timer; -}; - -static void _splitsend_announce(struct announce_data *ann) +static int send_announce_chunk(struct announce_data *ann) { struct sk_buff *skb; __u32 packet_size = 256; @@ -1091,52 +1100,70 @@ static void _splitsend_announce(struct announce_data *ann) __u32 overhead = 17 + headroom; char *header; char *ptr; - + int rc = 0; + if (remainingdata < packet_size) packet_size = remainingdata; - + skb = alloc_skb(packet_size + overhead, GFP_KERNEL); - if (unlikely(0 == skb)) - return; - + if (unlikely(skb == 0)) + return 0; + skb->protocol = htons(ETH_P_COR); skb->dev = ann->dev; skb_reserve(skb, headroom); - + if(unlikely(dev_hard_header(skb, ann->dev, ETH_P_COR, ann->dev->broadcast, ann->dev->dev_addr, skb->len) < 0)) goto out_err; - + skb_reset_network_header(skb); - + header = skb_put(skb, 17); if (unlikely(header == 0)) goto out_err; - + header[0] = PACKET_TYPE_ANNOUNCE; - + put_u32(header + 1, 0, 1); /* announce proto version */ put_u32(header + 5, ann->ann->packet_version, 1); /* packet version */ put_u32(header + 9, ann->ann->announce_msg_len, 1); /* total size */ put_u32(header + 13, ann->curr_announce_msg_offset, 1); /* offset */ - + ptr = skb_put(skb, packet_size); if (unlikely(ptr == 0)) goto out_err; - - memcpy(ptr, ann->ann->announce_msg + ann->curr_announce_msg_offset, packet_size); - dev_queue_xmit(skb); - - ann->curr_announce_msg_offset += packet_size; - - if (ann->curr_announce_msg_offset == ann->ann->announce_msg_len) - ann->curr_announce_msg_offset = 0; - - if (0) { + + memcpy(ptr, ann->ann->announce_msg + ann->curr_announce_msg_offset, + packet_size); + + rc = dev_queue_xmit(skb); + + if (rc != 0) { + qos_enqueue(ann->dev, &(ann->rb), QOS_CALLER_ANNOUNCE); + } else { + ann->curr_announce_msg_offset += packet_size; + + if (ann->curr_announce_msg_offset == ann->ann->announce_msg_len) + ann->curr_announce_msg_offset = 0; + } + + if (0) { out_err: if (skb != 0) kfree_skb(skb); } + + return rc; +} + +int send_announce_qos(struct announce_data *ann) +{ + int rc; + mutex_lock(&(neighbor_operation_lock)); + rc = send_announce_chunk(ann); + mutex_unlock(&(neighbor_operation_lock)); + return rc; } static void announce_free(struct kref *ref) @@ -1146,22 +1173,29 @@ static void announce_free(struct kref *ref) kfree(ann); } -static void splitsend_announce(struct work_struct *work) +void announce_data_free(struct kref *ref) +{ + struct announce_data *ann = container_of(ref, struct announce_data, + ref); + if (ann->ann != 0) + kref_put(&(ann->ann->ref), announce_free); + kfree(ann); +} + +static void send_announce(struct work_struct *work) { struct announce_data *ann = container_of(to_delayed_work(work), struct announce_data, announce_work); int reschedule = 0; - + mutex_lock(&(neighbor_operation_lock)); - + if (unlikely(ann->dev == 0)) goto out; - reschedule = 1; - + if (unlikely(ann->ann == 0 && last_announce == 0)) goto out; - if (ann->curr_announce_msg_offset == 0 && unlikely(ann->ann != last_announce)) { if (ann->ann != 0) @@ -1170,22 +1204,23 @@ static void splitsend_announce(struct work_struct *work) kref_get(&(ann->ann->ref)); } - _splitsend_announce(ann); + send_announce_chunk(ann); + out: mutex_unlock(&(neighbor_operation_lock)); - + if (reschedule) { __u64 jiffies = get_jiffies_64(); int delay; - + ann->scheduled_announce_timer += msecs_to_jiffies( ANNOUNCE_SEND_PACKETINTELVAL_MS); - + delay = ann->scheduled_announce_timer - jiffies; if (delay < 0) delay = 0; - - INIT_DELAYED_WORK(&(ann->announce_work), splitsend_announce); + + INIT_DELAYED_WORK(&(ann->announce_work), send_announce); schedule_delayed_work(&(ann->announce_work), delay); } } @@ -1193,59 +1228,62 @@ out: static struct announce_data *get_announce_by_netdev(struct net_device *dev) { struct list_head *lh = announce_out_list.next; - + while (lh != &announce_out_list) { struct announce_data *curr = (struct announce_data *)( ((char *) lh) - offsetof(struct announce_data, lh)); - + if (curr->dev == dev) return curr; } - + return 0; } -static void announce_sent_adddev(struct net_device *dev) +static void announce_send_adddev(struct net_device *dev) { struct announce_data *ann; - + ann = kmalloc(sizeof(struct announce_data), GFP_KERNEL); - + if (unlikely(ann == 0)) { printk(KERN_ERR "cor cannot allocate memory for sending " "announces"); return; } - + memset(ann, 0, sizeof(struct announce_data)); - + + kref_init(&(ann->ref)); + dev_hold(dev); ann->dev = dev; - + mutex_lock(&(neighbor_operation_lock)); list_add_tail(&(ann->lh), &announce_out_list); mutex_unlock(&(neighbor_operation_lock)); - + ann->scheduled_announce_timer = get_jiffies_64(); - INIT_DELAYED_WORK(&(ann->announce_work), splitsend_announce); - schedule_delayed_work(&(ann->announce_work), 1); + INIT_DELAYED_WORK(&(ann->announce_work), send_announce); + schedule_delayed_work(&(ann->announce_work), 1); } -static void announce_sent_rmdev(struct net_device *dev) +static void announce_send_rmdev(struct net_device *dev) { struct announce_data *ann; - + mutex_lock(&(neighbor_operation_lock)); - + ann = get_announce_by_netdev(dev); - + if (ann == 0) goto out; - + dev_put(ann->dev); ann->dev = 0; - + + kref_put(&(ann->ref), announce_data_free); out: mutex_unlock(&(neighbor_operation_lock)); } @@ -1254,13 +1292,18 @@ int netdev_notify_func(struct notifier_block *not, unsigned long event, void *ptr) { struct net_device *dev = (struct net_device *) ptr; - + int rc; + switch(event){ case NETDEV_UP: - announce_sent_adddev(dev); + rc = create_queue(dev); + if (rc == 1) + return 1; + announce_send_adddev(dev); break; case NETDEV_DOWN: - announce_sent_rmdev(dev); + destroy_queue(dev); + announce_send_rmdev(dev); break; case NETDEV_REBOOT: case NETDEV_CHANGE: @@ -1276,35 +1319,35 @@ int netdev_notify_func(struct notifier_block *not, unsigned long event, default: return 1; } - + return 0; } static int set_announce(char *msg, __u32 len) { struct announce *ann = kmalloc(sizeof(struct announce), GFP_KERNEL); - + if (unlikely(ann == 0)) { kfree(msg); return 1; } - + memset(ann, 0, sizeof(struct announce)); - + ann->announce_msg = msg; ann->announce_msg_len = len; - + kref_init(&(ann->ref)); - + mutex_lock(&(neighbor_operation_lock)); - + if (last_announce != 0) { ann->packet_version = last_announce->packet_version + 1; kref_put(&(last_announce->ref), announce_free); } - + last_announce = ann; - + mutex_unlock(&(neighbor_operation_lock)); return 0; @@ -1313,18 +1356,18 @@ static int set_announce(char *msg, __u32 len) static int generate_announce(void) { __u32 addrtypelen = strlen(addrtype); - + __u32 hdr_len = 16; __u32 cmd_hdr_len = 8; __u32 cmd_len = 2 + 2 + addrtypelen + addrlen; - + __u32 len = hdr_len + cmd_hdr_len + cmd_len; __u32 offset = 0; - + char *msg = kmalloc(len, GFP_KERNEL); if (unlikely(msg == 0)) return 1; - + put_u32(msg + offset, 0, 1); /* min_announce_proto_version */ offset += 4; put_u32(msg + offset, 0, 1); /* max_announce_proto_version */ @@ -1333,57 +1376,57 @@ static int generate_announce(void) offset += 4; put_u32(msg + offset, 0, 1); /* max_cor_proto_version */ offset += 4; - - + + put_u32(msg + offset, NEIGHCMD_ADDADDR, 1); /* command */ offset += 4; put_u32(msg + offset, cmd_len, 1); /* command length */ offset += 4; - + /* addrtypelen, addrlen */ put_u16(msg + offset, addrtypelen, 1); offset += 2; put_u16(msg + offset, addrlen, 1); offset += 2; - + /* addrtype, addr */ memcpy(msg + offset, addrtype, addrtypelen); offset += addrtypelen; memcpy(msg + offset, addr, addrlen); offset += addrlen; - + BUG_ON(offset != len); - + return set_announce(msg, len); } int __init cor_neighbor_init(void) { addrlen = 16; - + addr = kmalloc(addrlen, GFP_KERNEL); if (unlikely(addr == 0)) goto error_free2; - + get_random_bytes(addr, addrlen); - + nb_slab = kmem_cache_create("cor_neighbor", sizeof(struct neighbor), 8, 0, 0); announce_in_slab = kmem_cache_create("cor_announce_in", sizeof(struct announce_in), 8, 0, 0); - + if (unlikely(generate_announce())) goto error_free1; - + memset(&netdev_notify, 0, sizeof(netdev_notify)); netdev_notify.notifier_call = netdev_notify_func; register_netdevice_notifier(&netdev_notify); - + return 0; - + error_free1: kfree(addr); - + error_free2: return -ENOMEM; } diff --git a/net/cor/rcv.c b/net/cor/rcv.c index 34ebc62188d..665c07141e5 100644 --- a/net/cor/rcv.c +++ b/net/cor/rcv.c @@ -40,14 +40,14 @@ void drain_ooo_queue(struct conn *rconn) BUG_ON(SOURCE_IN != rconn->sourcetype); skb = rconn->source.in.reorder_queue.next; - + while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) { struct skb_procstate *ps = skb_pstate(skb); int drop; - + if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno) break; - + drop = receive_skb(rconn, skb); if (drop) break; @@ -56,7 +56,7 @@ void drain_ooo_queue(struct conn *rconn) rconn->source.in.ooo_packets--; atomic_dec(&(rconn->source.in.nb->ooo_packets)); atomic_dec(&ooo_packets); - + rconn->source.in.next_seqno += skb->len; } } @@ -66,38 +66,38 @@ static int _conn_rcv_ooo(struct conn *rconn, struct sk_buff *skb) struct skb_procstate *ps = skb_pstate(skb); struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue); struct sk_buff *curr = reorder_queue->next; - + long ooo; - + rconn->source.in.ooo_packets++; if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN) goto drop_ooo3; - + ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets)); if (ooo > MAX_TOTAL_OOO_PER_NEIGH) goto drop_ooo2; - + ooo = atomic_inc_return(&ooo_packets); if (ooo > MAX_TOTAL_OOO_PACKETS) goto drop_ooo1; - - + + while (1) { struct skb_procstate *ps2 = skb_pstate(curr); - + if ((void *) curr == (void *) reorder_queue) { skb_queue_tail(reorder_queue, skb); break; } - + if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) { skb_insert(curr, skb, reorder_queue); break; } - + curr = curr->next; } - + if (0) { drop_ooo1: atomic_dec(&ooo_packets); @@ -105,10 +105,10 @@ drop_ooo2: atomic_dec(&(rconn->source.in.nb->ooo_packets)); drop_ooo3: rconn->source.in.ooo_packets--; - + return 1; } - + return 0; } @@ -117,42 +117,42 @@ static void _conn_rcv(struct conn *rconn, struct sk_buff *skb) struct skb_procstate *ps = skb_pstate(skb); struct control_msg_out *cm = alloc_control_msg(rconn->source.in.nb, ACM_PRIORITY_MED); - + int in_order; int drop = 1; - + __u32 len = skb->len; - + BUG_ON(rconn->sourcetype != SOURCE_IN); - + if (unlikely(cm == 0)) { kfree_skb(skb); return; } - + mutex_lock(&(rconn->rcv_lock)); - + in_order = (rconn->source.in.next_seqno == ps->funcstate.rcv2.seqno); - + if (in_order == 0) { drop = _conn_rcv_ooo(rconn, skb); } else { rconn->source.in.next_seqno += skb->len; drop = receive_skb(rconn, skb); } - + if (drop) { kfree_skb(skb); free_control_msg(cm); goto out; } - + #warning todo set window if (in_order == 0) { send_ack_conn_ooo(cm, rconn->reversedir->target.out.conn_id, rconn->source.in.next_seqno, enc_window(65536), ps->funcstate.rcv2.seqno, len); - } else { + } else { drain_ooo_queue(rconn); send_ack_conn(cm, rconn->reversedir->target.out.conn_id, rconn->source.in.next_seqno, enc_window(65536)); @@ -166,11 +166,11 @@ static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno) { struct conn *rconn; struct skb_procstate *ps = skb_pstate(skb); - + ps->funcstate.rcv2.seqno = seqno; rconn = get_conn(conn_id); - + if (unlikely(rconn == 0)) { printk(KERN_ERR "unknown conn_id when receiving: %d", conn_id); kfree_skb(skb); @@ -192,33 +192,33 @@ static void rcv_data(struct sk_buff *skb) { __u32 conn_id; __u32 seqno; - + char *connid_p = cor_pull_skb(skb, 4); char *seqno_p = cor_pull_skb(skb, 4); - - __u8 rand; - + + /* __u8 rand; */ + ((char *)&conn_id)[0] = connid_p[0]; ((char *)&conn_id)[1] = connid_p[1]; ((char *)&conn_id)[2] = connid_p[2]; ((char *)&conn_id)[3] = connid_p[3]; - + ((char *)&seqno)[0] = seqno_p[0]; ((char *)&seqno)[1] = seqno_p[1]; ((char *)&seqno)[2] = seqno_p[2]; ((char *)&seqno)[3] = seqno_p[3]; - + conn_id = be32_to_cpu(conn_id); seqno = be32_to_cpu(seqno); - - get_random_bytes(&rand, 1); + + /* get_random_bytes(&rand, 1); if (rand < 64) { printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]); goto drop; - } - + } */ + if (conn_id == 0) { struct neighbor *nb = get_neigh_by_mac(skb); if (unlikely(nb == 0)) @@ -228,7 +228,7 @@ static void rcv_data(struct sk_buff *skb) } else { conn_rcv(skb, conn_id, seqno); } - + if (0) { drop: kfree_skb(skb); @@ -244,19 +244,19 @@ static void rcv(struct work_struct *work) char *packet_type_p; atomic_dec(&packets_in_workqueue); - + packet_type_p = cor_pull_skb(skb, 1); - + if (unlikely(packet_type_p == 0)) goto drop; - + packet_type = *packet_type_p; - + if (packet_type == PACKET_TYPE_ANNOUNCE) { rcv_announce(skb); return; } - + if (unlikely(packet_type != PACKET_TYPE_DATA)) goto drop; diff --git a/net/cor/snd.c b/net/cor/snd.c index 1ca648bfe20..360d5a08aea 100644 --- a/net/cor/snd.c +++ b/net/cor/snd.c @@ -46,98 +46,340 @@ static void free_connretrans(struct kref *ref) kref_put(&(cr->rconn->ref), free_conn); } -/* static struct sk_buff * cor_dequeue(struct Qdisc *sch) -{ - struct sk_buff *ret; +DEFINE_MUTEX(queues_lock); +LIST_HEAD(queues); +struct delayed_work qos_resume_work; + +struct qos_queue { + struct list_head queue_list; + + struct net_device *dev; + + struct list_head kpackets_waiting; + struct list_head conn_retrans_waiting; + struct list_head announce_waiting; + struct list_head conns_waiting; +}; - struct cor_sched_data *q = qdisc_priv(sch); +#define RC_FLUSH_CONN_OK 0 +#define RC_FLUSH_CONN_CONG 1 +#define RC_FLUSH_CONN_CREDITS 2 - struct list_head *ln = q->conn_list.next; +static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte); + +static int _resume_conns(struct qos_queue *q) +{ struct conn *best = 0; + __u64 bestcredit = 0; + __u64 secondcredit = 0; - __u64 currcost_limit = 0; - __u64 currcost = 0; + int rc; - spin_lock(&(q->lock)); - - if (!(skb_queue_empty(&(q->requeue_queue)))) { - ret = __skb_dequeue(&(q->requeue_queue)); - goto out; + struct list_head *lh = q->conns_waiting.next; + + while (lh != &(q->conns_waiting)) { + struct conn *rconn = container_of(lh, struct conn, + target.out.rb.lh); + __u64 credits; + + BUG_ON(rconn->targettype != TARGET_OUT); + + mutex_lock(&(rconn->rcv_lock)); + credits = ((__u64) rconn->target.out.credits) << 32 / + rconn->buf.read_remaining; + mutex_unlock(&(rconn->rcv_lock)); + + if (best == 0 || bestcredit < credits) { + secondcredit = bestcredit; + best = rconn; + bestcredit = credits; + } else if (secondcredit < credits) { + secondcredit = credits; + } + + lh = lh->next; + } + + mutex_lock(&(best->rcv_lock)); + rc = _flush_out(best, 1, (__u32) (secondcredit >> 32)); + + if (rc == 0) { + best->target.out.rb.in_queue = 0; + list_del(&(best->target.out.rb.lh)); + } + mutex_unlock(&(best->rcv_lock)); + + return rc; +} + +static int resume_conns(struct qos_queue *q) +{ + while (list_empty(&(q->conns_waiting))) { + int rc = _resume_conns(q); + if (rc != 0) + return rc; } + return 0; +} + +static int send_retrans(struct neighbor *nb); + +static int _qos_resume(struct qos_queue *q, int caller) +{ + int rc = 0; + + struct list_head *lh; - while (&(q->conn_list) != ln) { - __u32 max1, max2, maxcost; - struct conn *curr = (struct conn *) - (((char *) ln) - offsetof(struct conn, - target.out.queue_list)); - - BUG_ON(TARGET_OUT != curr->targettype); - max1 = (256 * ((__u64)curr->credits)) / - ((__u64)curr->bytes_queued + curr->avg_rate); - - max2 = (256 * ((__u64)curr->credits + - curr->credit_sender - curr->credit_recp)) / - ((__u64)curr->bytes_queued + 2*curr->avg_rate); - - maxcost = max((__u32) 0, min((max1), (max2))); - - if (maxcost > currcost_limit) { - currcost = currcost_limit; - currcost_limit = maxcost; - best = curr; + if (caller == QOS_CALLER_KPACKET) + lh = &(q->conn_retrans_waiting); + else if (caller == QOS_CALLER_CONN_RETRANS) + lh = &(q->kpackets_waiting); + else if (caller == QOS_CALLER_ANNOUNCE) + lh = &(q->announce_waiting); + else + BUG(); + + while (list_empty(lh) == 0) { + struct list_head *curr = lh->next; + struct resume_block *rb = container_of(curr, + struct resume_block, lh); + rb->in_queue = 0; + list_del(curr); + + mutex_unlock(&(queues_lock)); + + if (caller == QOS_CALLER_KPACKET) { + struct neighbor *nb = container_of(rb, struct neighbor, + rb_kp); + rc = resume_send_messages(nb); + } else if (caller == QOS_CALLER_CONN_RETRANS) { + struct neighbor *nb = container_of(rb, struct neighbor, + rb_cr); + rc = send_retrans(nb); + } else if (caller == QOS_CALLER_ANNOUNCE) { + struct announce_data *ann = container_of(rb, + struct announce_data, rb); + rc = send_announce_qos(ann); + } else { + BUG(); } - - ln = ln->next; + + mutex_lock(&(queues_lock)); + + if (rc != 0 && rb->in_queue == 0) { + rb->in_queue = 1; + list_add(curr , lh); + } else { + if (caller == QOS_CALLER_KPACKET) { + kref_put(&(container_of(rb, struct neighbor, + rb_kp)->ref), neighbor_free); + } else if (caller == QOS_CALLER_CONN_RETRANS) { + kref_put(&(container_of(rb, struct neighbor, + rb_cr)->ref), neighbor_free); + } else if (caller == QOS_CALLER_ANNOUNCE) { + kref_put(&(container_of(rb, + struct announce_data, rb)->ref), + announce_data_free); + } else { + BUG(); + } + + } + + if (rc != 0) + break; } + return rc; +} - best->credits -= currcost; +static void qos_resume(struct work_struct *work) +{ + struct list_head *curr; + + mutex_lock(&(queues_lock)); + + curr = queues.next; + while (curr != (&queues)) { + struct qos_queue *q = container_of(curr, + struct qos_queue, queue_list); + int i; - ret = __skb_dequeue(&(best->target.out.queue)); - - if (skb_queue_empty(&(best->target.out.queue))) { - list_del(&(best->target.out.queue_list)); - best->target.out.qdisc_active = 0; + for (i=0;i<4;i++) { + int rc; + if (i == 3) + rc = resume_conns(q); + else + rc = _qos_resume(q, i); + + if (rc != 0) + goto congested; + } + + curr = curr->next; + + if (i == 4 && unlikely(q->dev == 0)) { + list_del(&(q->queue_list)); + kfree(q); + } } - -out: - spin_unlock(&(q->lock)); - if (likely(0 != ret)) { - sch->qstats.backlog -= ret->len; - sch->q.qlen--; + if (0) { +congested: + schedule_delayed_work(&(qos_resume_work), 1); } - return ret; + mutex_unlock(&(queues_lock)); } -static int cor_enqueue(struct sk_buff *skb, struct Qdisc *sch) +static struct qos_queue *get_queue(struct net_device *dev) { - struct cor_sched_data *q = qdisc_priv(sch); - struct conn *rconn; + struct list_head *curr = queues.next; + while (curr != (&queues)) { + struct qos_queue *q = container_of(curr, + struct qos_queue, queue_list); + if (q->dev == dev) + return q; + } + return 0; +} - rconn = skb_pstate(skb)->rconn; +int destroy_queue(struct net_device *dev) +{ + struct qos_queue *q; - BUG_ON(TARGET_OUT != rconn->targettype); + mutex_lock(&(queues_lock)); - spin_lock(&(rconn->target.out.qdisc_lock)); + q = get_queue(dev); - __skb_queue_tail(&(rconn->target.out.queue), skb); - - if (unlikely(0 == rconn->target.out.qdisc_active)) { - spin_lock(&(q->lock)); - list_add(&(rconn->target.out.queue_list), &(q->conn_list)); - rconn->target.out.qdisc_active = 1; - spin_unlock(&(q->lock)); + if (q == 0) { + mutex_unlock(&(queues_lock)); + return 1; + } + + q->dev = 0; + + dev_put(dev); + + mutex_unlock(&(queues_lock)); + + return 0; +} + +int create_queue(struct net_device *dev) +{ + struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL); + + if (q == 0) { + printk(KERN_ERR "cor: unable to allocate memory for device " + "queue, not enabling device"); + return 1; } - spin_unlock(&(rconn->target.out.qdisc_lock)); + q->dev = dev; + dev_hold(dev); + + INIT_LIST_HEAD(&(q->kpackets_waiting)); + INIT_LIST_HEAD(&(q->conn_retrans_waiting)); + INIT_LIST_HEAD(&(q->announce_waiting)); + INIT_LIST_HEAD(&(q->conns_waiting)); + + mutex_lock(&(queues_lock)); + list_add(&(q->queue_list), &queues); + mutex_unlock(&(queues_lock)); + + return 0; +} + +void qos_enqueue(struct net_device *dev, struct resume_block *rb, int caller) +{ + struct qos_queue *q; + + mutex_lock(&(queues_lock)); + + if (rb->in_queue) + goto out; + + q = get_queue(dev); + if (unlikely(q == 0)) + goto out; + + rb->in_queue = 1; + + if (caller == QOS_CALLER_KPACKET) { + list_add(&(rb->lh) , &(q->conn_retrans_waiting)); + kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref)); + } else if (caller == QOS_CALLER_CONN_RETRANS) { + list_add(&(rb->lh), &(q->kpackets_waiting)); + kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref)); + } else if (caller == QOS_CALLER_ANNOUNCE) { + list_add(&(rb->lh), &(q->announce_waiting)); + kref_get(&(container_of(rb, struct announce_data, rb)->ref)); + } else { + BUG(); + } + +out: + mutex_unlock(&(queues_lock)); +} + +void qos_enqueue_kpacket(struct neighbor *nb) +{ + qos_enqueue(nb->dev, &(nb->rb_kp), QOS_CALLER_KPACKET); +} + +static void qos_enqueue_conn_retrans(struct neighbor *nb) +{ + qos_enqueue(nb->dev, &(nb->rb_cr), QOS_CALLER_CONN_RETRANS); +} + +static void qos_enqueue_conn(struct conn *rconn) +{ + BUG_ON(rconn->targettype != TARGET_OUT); + qos_enqueue(rconn->target.out.nb->dev, &(rconn->target.out.rb), + QOS_CALLER_CONN); +} + +static int may_send_conn_retrans(struct neighbor *nb) +{ + struct qos_queue *q; + int rc = 0; + + mutex_lock(&(queues_lock)); + + q = get_queue(nb->dev); + if (unlikely(q == 0)) + goto out; + + rc = (list_empty(&(q->kpackets_waiting))); + +out: + mutex_unlock(&(queues_lock)); + + return rc; +} + +static int may_send_conn(struct conn *rconn) +{ + struct qos_queue *q; + int rc = 0; + + mutex_lock(&(queues_lock)); + + q = get_queue(rconn->target.out.nb->dev); + if (unlikely(q == 0)) + goto out; + + rc = (list_empty(&(q->kpackets_waiting)) && + list_empty(&(q->conn_retrans_waiting)) && + list_empty(&(q->announce_waiting)) && + list_empty(&(q->conns_waiting))); - sch->bstats.bytes += skb->len; - sch->bstats.packets++; - sch->q.qlen++; +out: + mutex_unlock(&(queues_lock)); + + return rc; +} - return NET_XMIT_SUCCESS; -} */ struct sk_buff *create_packet(struct neighbor *nb, int size, gfp_t alloc_flags, __u32 conn_id, __u32 seqno) @@ -148,10 +390,10 @@ struct sk_buff *create_packet(struct neighbor *nb, int size, ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags); if (unlikely(0 == ret)) return 0; - + ret->protocol = htons(ETH_P_COR); ret->dev = nb->dev; - + skb_reserve(ret, LL_RESERVED_SPACE(nb->dev)); if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac, nb->dev->dev_addr, ret->len) < 0)) @@ -160,10 +402,10 @@ struct sk_buff *create_packet(struct neighbor *nb, int size, dest = skb_put(ret, 9); BUG_ON(0 == dest); - + dest[0] = PACKET_TYPE_DATA; dest += 1; - + put_u32(dest, conn_id, 1); dest += 4; put_u32(dest, seqno, 1); @@ -186,7 +428,7 @@ static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr, unsigned long iflags; struct conn_retrans *ret = 0; - + spin_lock_irqsave( &(nb->retrans_lock), iflags ); if (unlikely(cr->ackrcvd)) { @@ -250,10 +492,11 @@ void cancel_retrans(struct conn *rconn) spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); } -static void send_retrans(struct neighbor *nb, struct conn_retrans *cr) +static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) { int targetmss = mss(nb); int dontsend; + int queuefull = 0; mutex_lock(&(cr->rconn->rcv_lock)); @@ -271,6 +514,11 @@ static void send_retrans(struct neighbor *nb, struct conn_retrans *cr) struct sk_buff *skb; char *dst; struct conn_retrans *cr2; + int rc; + + if (may_send_conn_retrans(nb) == 0) + goto qos_enqueue; + skb = create_packet(nb, targetmss, GFP_KERNEL, cr->rconn->target.out.conn_id, cr->seqno); if (unlikely(skb == 0)) { @@ -288,10 +536,26 @@ static void send_retrans(struct neighbor *nb, struct conn_retrans *cr) dst = skb_put(skb, targetmss); databuf_pullold(&(cr->rconn->buf), cr->seqno, dst, targetmss); - dev_queue_xmit(skb); + rc = dev_queue_xmit(skb); + + if (rc != 0) { + unsigned long iflags; + + spin_lock_irqsave( &(nb->retrans_lock), iflags ); + if (unlikely(cr->ackrcvd)) { + dontsend = 1; + } else { + list_del(&(cr->timeout_list)); + list_add(&(cr->timeout_list), + &(nb->retrans_list_conn)); + } + spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); + if (dontsend == 0) + goto qos_enqueue; + } cr = cr2; - + if (likely(cr == 0)) goto out; } @@ -325,23 +589,29 @@ static void send_retrans(struct neighbor *nb, struct conn_retrans *cr) cr->seqno, buf, buf, cr->length); } } + + if (0) { +qos_enqueue: + qos_enqueue_conn_retrans(nb); + queuefull = 1; + } out: mutex_unlock(&(cr->rconn->rcv_lock)); kref_put(&(cr->rconn->ref), free_conn); + + return queuefull; } -void retransmit_conn_timerfunc(struct work_struct *work) +static int send_retrans(struct neighbor *nb) { unsigned long iflags; - struct neighbor *nb = container_of(to_delayed_work(work), - struct neighbor, retrans_timer_conn); - struct conn_retrans *cr = 0; - + int nbstate; - int nbput = 0; + int nbput = 1; + int queuefull = 0; spin_lock_irqsave( &(nb->state_lock), iflags ); nbstate = nb->state; @@ -349,10 +619,9 @@ void retransmit_conn_timerfunc(struct work_struct *work) while (1) { spin_lock_irqsave( &(nb->retrans_lock), iflags ); - + if (list_empty(&(nb->retrans_list_conn))) { nb->retrans_timer_conn_running = 0; - nbput = 1; break; } @@ -377,25 +646,39 @@ void retransmit_conn_timerfunc(struct work_struct *work) if (time_after(cr->timeout, jiffies)) { schedule_delayed_work(&(nb->retrans_timer_conn), cr->timeout - jiffies); + nbput = 0; break; } kref_get(&(cr->ref)); spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); - send_retrans(nb, cr); + queuefull = _send_retrans(nb, cr); kref_put(&(cr->ref), free_connretrans); + if (queuefull) + goto out; } spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); +out: if (nbput) kref_put(&(nb->ref), neighbor_free); + + return queuefull; +} + +void retransmit_conn_timerfunc(struct work_struct *work) +{ + struct neighbor *nb = container_of(to_delayed_work(work), + struct neighbor, retrans_timer_conn); + + send_retrans(nb); } static struct conn_retrans *search_seqno(struct conn *rconn, __u32 seqno) { struct list_head *next = rconn->target.out.retrans_list.next; - + while (next != &(rconn->target.out.retrans_list)) { struct conn_retrans *cr = container_of(next, struct conn_retrans, conn_list); @@ -413,16 +696,16 @@ void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno, unsigned long iflags; struct neighbor *nb = rconn->target.out.nb; struct conn_retrans *cr = 0; - + int setwindow = 0; BUG_ON(rconn->targettype != TARGET_OUT); - + mutex_lock(&(rconn->rcv_lock)); - + if (unlikely(seqno - rconn->target.out.seqno_nextsend > 0)) goto out; - + spin_lock_irqsave( &(nb->retrans_lock), iflags ); if (likely(length == 0)) @@ -469,7 +752,7 @@ in_order: } cr = search_seqno(rconn, seqno_ooo); - + while (cr != 0) { struct list_head *next = cr->conn_list.next; struct conn_retrans *nextcr = 0; @@ -477,7 +760,7 @@ in_order: nextcr = container_of(next, struct conn_retrans, conn_list); } - + if (((__s32)(cr->seqno + cr->length - rconn->target.out.seqno_acked)) <= 0) { list_del(&(cr->timeout_list)); @@ -485,13 +768,13 @@ in_order: cr->ackrcvd = 1; kref_put(&(cr->ref), free_connretrans); } - + cr = nextcr; } - + spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); databuf_ack(&(rconn->buf), rconn->target.out.seqno_acked); - + setwindow = setwindow || (seqno == rconn->target.out.seqno_acked && (kpacket_seqno - rconn->target.out.kp_windowsetseqno > 0)); @@ -509,13 +792,13 @@ static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *rconn __u32 seqno, __u32 len) { unsigned long iflags; - + struct neighbor *nb = rconn->target.out.nb; int first; - + BUG_ON(rconn->targettype != TARGET_OUT); - + memset(cr, 0, sizeof (struct conn_retrans)); cr->rconn = rconn; kref_get(&(rconn->ref)); @@ -523,14 +806,14 @@ static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *rconn cr->length = len; kref_init(&(cr->ref)); set_conn_retrans_timeout(cr); - + spin_lock_irqsave( &(nb->retrans_lock), iflags ); - + first = unlikely(list_empty(&(nb->retrans_list_conn))); list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn)); - + list_add_tail(&(cr->conn_list), &(rconn->target.out.retrans_list)); - + if (unlikely(unlikely(first) && unlikely(nb->retrans_timer_conn_running == 0))) { schedule_delayed_work(&(nb->retrans_timer_conn), @@ -542,7 +825,7 @@ static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *rconn spin_unlock_irqrestore( &(nb->retrans_lock), iflags ); } -void flush_out(struct conn *rconn) +static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte) { int targetmss = mss(rconn->target.out.nb); __u32 seqno; @@ -552,15 +835,24 @@ void flush_out(struct conn *rconn) BUG_ON(rconn->targettype != TARGET_OUT); if (unlikely(rconn->target.out.conn_id == 0)) - return; + return RC_FLUSH_CONN_OK; if (unlikely(atomic_read(&(rconn->isreset)) != 0)) - return; + return RC_FLUSH_CONN_OK; + + if (fromqos == 0 && may_send_conn(rconn) == 0) + goto qos; while (rconn->buf.read_remaining >= targetmss) { struct conn_retrans *cr; struct sk_buff *skb; char *dst; + int rc; + + if (unlikely(creditsperbyte * targetmss > + rconn->target.out.credits)) + return RC_FLUSH_CONN_CREDITS; + seqno = rconn->target.out.seqno_nextsend; skb = create_packet(rconn->target.out.nb, targetmss, GFP_ATOMIC, rconn->target.out.conn_id, seqno); @@ -577,10 +869,16 @@ void flush_out(struct conn *rconn) databuf_pull(&(rconn->buf), dst, targetmss); - schedule_retransmit_conn(cr, rconn, seqno, targetmss); + rc = dev_queue_xmit(skb); + if (rc != 0) { + databuf_unpull(&(rconn->buf), targetmss); + kmem_cache_free(connretrans_slab, cr); + goto qos; + } + rconn->target.out.credits -= creditsperbyte * targetmss; rconn->target.out.seqno_nextsend += targetmss; - dev_queue_xmit(skb); + schedule_retransmit_conn(cr, rconn, seqno, targetmss); } if (rconn->buf.read_remaining > 0) { @@ -589,6 +887,9 @@ void flush_out(struct conn *rconn) __u32 len = rconn->buf.read_remaining; char *buf = kmalloc(len, GFP_KERNEL); + if (unlikely(creditsperbyte * len > rconn->target.out.credits)) + return RC_FLUSH_CONN_CREDITS; + if (unlikely(buf == 0)) goto oom; @@ -608,6 +909,7 @@ void flush_out(struct conn *rconn) databuf_pull(&(rconn->buf), buf, len); seqno = rconn->target.out.seqno_nextsend; + rconn->target.out.credits -= creditsperbyte * len; rconn->target.out.seqno_nextsend += len; schedule_retransmit_conn(cr, rconn, seqno, len); @@ -619,10 +921,25 @@ void flush_out(struct conn *rconn) wake_sender(rconn); if (0) { +qos: + printk(KERN_ERR "qos"); + if (fromqos == 0) + qos_enqueue_conn(rconn); + return RC_FLUSH_CONN_CONG; + } + + if (0) { oom: #warning todo flush later - ; + printk(KERN_ERR "oom"); } + + return RC_FLUSH_CONN_OK; +} + +void flush_out(struct conn *rconn) +{ + _flush_out(rconn, 0, 0); } int __init cor_snd_init(void) @@ -632,7 +949,9 @@ int __init cor_snd_init(void) if (unlikely(connretrans_slab == 0)) return 1; - + + INIT_DELAYED_WORK(&(qos_resume_work), qos_resume); + return 0; } -- 2.11.4.GIT