From 2dc68af206e0730ef3c72dd3ecb1185516385582 Mon Sep 17 00:00:00 2001 From: Michael Blizek Date: Sat, 30 Jul 2011 11:36:47 +0200 Subject: [PATCH] conn rcv_lock converted to spinlock, struct cor_sock created, kernel_packet skb_clone removed --- net/cor/common.c | 151 +++++------ net/cor/cor.h | 138 ++++++---- net/cor/cpacket_parse.c | 38 +-- net/cor/credits.c | 21 +- net/cor/forward.c | 302 ++++++++------------- net/cor/kpacket_gen.c | 149 +++++------ net/cor/kpacket_parse.c | 91 +++---- net/cor/neighbor.c | 119 ++++++--- net/cor/rcv.c | 218 ++++++--------- net/cor/snd.c | 131 ++++----- net/cor/sock.c | 697 +++++++++++++++++++++++++++++++++--------------- 11 files changed, 1105 insertions(+), 950 deletions(-) diff --git a/net/cor/common.c b/net/cor/common.c index 84490798f5e..e61280f839b 100644 --- a/net/cor/common.c +++ b/net/cor/common.c @@ -22,10 +22,9 @@ #include "cor.h" -DEFINE_MUTEX(cor_bindnodes); +DEFINE_SPINLOCK(cor_bindnodes); DEFINE_SPINLOCK(conn_free); - -DEFINE_MUTEX(connid_gen); +DEFINE_SPINLOCK(connid_gen); LIST_HEAD(openports); @@ -45,9 +44,6 @@ struct htable connid_reuse_table; atomic_t num_conns; -struct kmem_cache *bindnode_slab; -struct kmem_cache *connlistener_slab; - /* see cor.h/KP_ACK_CONN */ static const __u32 log_64_11_table[] = {0, @@ -832,7 +828,7 @@ static int connid_alloc(struct conn *src_in_ll) BUG_ON(src_in_ll->sourcetype != SOURCE_IN); - mutex_lock(&connid_gen); + spin_lock_bh(&connid_gen); for(i=0;i<16;i++) { conn_id = 0; get_random_bytes((char *) &conn_id, sizeof(conn_id)); @@ -845,14 +841,14 @@ static int connid_alloc(struct conn *src_in_ll) goto found; } - mutex_unlock(&connid_gen); + spin_unlock_bh(&connid_gen); return 1; found: src_in_ll->source.in.conn_id = conn_id; htable_insert(&connid_table, (char *) src_in_ll, conn_id); - mutex_unlock(&connid_gen); + spin_unlock_bh(&connid_gen); return 0; } @@ -934,7 +930,7 @@ int conn_init_out(struct conn *trgt_unconn_ll, struct neighbor *nb) src_none_ll->source.in.cir = kmem_cache_alloc(connid_reuse_slab, - GFP_KERNEL); + GFP_ATOMIC); if (unlikely(src_none_ll->source.in.cir == 0)) { rc = 1; goto out; @@ -1034,9 +1030,6 @@ struct conn* alloc_conn(gfp_t allocflags) kref_init(&(cn1->ref)); kref_init(&(cn2->ref)); - cn1->sockstate = SOCKSTATE_CONN; - cn2->sockstate = SOCKSTATE_CONN; - cn1->sourcetype = SOURCE_NONE; cn2->sourcetype = SOURCE_NONE; cn1->targettype = TARGET_UNCONNECTED; @@ -1045,8 +1038,8 @@ struct conn* alloc_conn(gfp_t allocflags) cn1->isreset = 0; cn2->isreset = 0; - mutex_init(&(cn1->rcv_lock)); - mutex_init(&(cn2->rcv_lock)); + spin_lock_init(&(cn1->rcv_lock)); + spin_lock_init(&(cn2->rcv_lock)); cn1->ktime_credit_update = ktime_get(); cn2->ktime_credit_update = cn1->ktime_credit_update; @@ -1065,17 +1058,16 @@ out_err0: return 0; } -static struct connlistener *get_connlistener(__be64 port) +static struct cor_sock *get_corsock(__be64 port) { struct list_head *curr = openports.next; while (curr != &openports) { - struct bindnode *currnode = ((struct bindnode *) - (((char *)curr) - offsetof(struct bindnode, lh))); - if (currnode->port == port) { - BUG_ON(currnode->owner == 0); - return currnode->owner; - } + struct cor_sock *cs = container_of(curr, struct cor_sock, + data.listener.lh); + BUG_ON(cs->type != SOCKTYPE_LISTENER); + if (cs->data.listener.port == port) + return cs; curr = curr->next; } @@ -1083,63 +1075,50 @@ static struct connlistener *get_connlistener(__be64 port) return 0; } -void close_port(struct connlistener *listener) +void close_port(struct cor_sock *cs) { - mutex_lock(&cor_bindnodes); + spin_lock_bh(&cor_bindnodes); - if (listener->bn != 0) { - list_del(&(listener->bn->lh)); - kmem_cache_free(bindnode_slab, listener->bn); - listener->bn = 0; - } + list_del(&(cs->data.listener.lh)); - while (list_empty(&(listener->conn_queue)) == 0) { + while (list_empty(&(cs->data.listener.conn_queue)) == 0) { struct conn *src_sock_o = container_of( - listener->conn_queue.next, + cs->data.listener.conn_queue.next, struct conn, source.sock.cl_list); list_del(&(src_sock_o->source.sock.cl_list)); reset_conn(src_sock_o); kref_put(&(src_sock_o->ref), free_conn); } - kmem_cache_free(connlistener_slab, listener); - - mutex_unlock(&cor_bindnodes); + spin_unlock_bh(&cor_bindnodes); } -struct connlistener *open_port(__be64 port) +int open_port(struct cor_sock *cs_l, __be64 port) { + int rc = 0; - struct bindnode *bn = 0; - struct connlistener *listener = 0; - - mutex_lock(&cor_bindnodes); - if (get_connlistener(port) != 0) + spin_lock_bh(&cor_bindnodes); + if (get_corsock(port) != 0) { + rc = -EADDRINUSE; goto out; + } + BUG_ON(cs_l->type != SOCKTYPE_UNCONNECTED); - bn = kmem_cache_alloc(bindnode_slab, GFP_KERNEL); - listener = kmem_cache_alloc(connlistener_slab, GFP_KERNEL); - - memset(bn, 0, sizeof(struct bindnode)); - memset(listener, 0, sizeof(struct connlistener)); - - bn->owner = listener; - bn->port = port; + cs_l->type = SOCKTYPE_LISTENER; + cs_l->data.listener.port = port; /* kref is not actually used */ - listener->sockstate = SOCKSTATE_LISTENER; - listener->bn = bn; - mutex_init(&(listener->lock)); - INIT_LIST_HEAD(&(listener->conn_queue)); - init_waitqueue_head(&(listener->wait)); + INIT_LIST_HEAD(&(cs_l->data.listener.conn_queue)); + init_waitqueue_head(&(cs_l->data.listener.wait)); - list_add_tail((struct list_head *) &(bn->lh), &openports); + list_add_tail((struct list_head *) &(cs_l->data.listener.lh), + &openports); out: - mutex_unlock(&cor_bindnodes); + spin_unlock_bh(&cor_bindnodes); - return listener; + return rc; } /** @@ -1149,22 +1128,22 @@ out: */ int connect_port(struct conn *trtg_unconn_l, __be64 port) { - - struct connlistener *listener; + struct cor_sock *cs; int rc = 0; - mutex_lock(&cor_bindnodes); + spin_lock_bh(&cor_bindnodes); - listener = get_connlistener(port); - if (listener == 0) { + cs = get_corsock(port); + if (cs == 0) { rc = 2; goto out; } - mutex_lock(&(listener->lock)); + spin_lock_bh(&(cs->lock)); - if (unlikely(listener->queue_len >= listener->queue_maxlen)) { - if (listener->queue_maxlen <= 0) + if (unlikely(cs->data.listener.queue_len >= + cs->data.listener.queue_maxlen)) { + if (cs->data.listener.queue_maxlen <= 0) rc = 2; else rc = 3; @@ -1175,21 +1154,21 @@ int connect_port(struct conn *trtg_unconn_l, __be64 port) kref_get(&(trtg_unconn_l->reversedir->ref)); BUG_ON(trtg_unconn_l->is_client != 1); - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); conn_init_sock_target(trtg_unconn_l); conn_init_sock_source(trtg_unconn_l->reversedir); - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); list_add_tail(&(trtg_unconn_l->reversedir->source.sock.cl_list), - &(listener->conn_queue)); - listener->queue_len++; - wake_up_interruptible(&(listener->wait)); + &(cs->data.listener.conn_queue)); + cs->data.listener.queue_len++; + wake_up_interruptible(&(cs->data.listener.wait)); out2: - mutex_unlock(&(listener->lock)); + spin_unlock_bh(&(cs->lock)); out: - mutex_unlock(&cor_bindnodes); + spin_unlock_bh(&cor_bindnodes); return rc; } @@ -1227,9 +1206,9 @@ int connect_neigh(struct conn *trtg_unconn_l, goto discard; } - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); ciorc = conn_init_out(trtg_unconn_l, nb); - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); if (unlikely(ciorc)) { rc = 4; goto freecm; @@ -1261,7 +1240,7 @@ static int _reset_conn(struct conn *cn, int trgt_out_resetneeded) int krefput = 1; /* lock sourcetype/targettype */ - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); if (cn->sourcetype == SOURCE_IN) { unsigned long iflags; @@ -1350,7 +1329,7 @@ static int _reset_conn(struct conn *cn, int trgt_out_resetneeded) databuf_ackdiscard(cn); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); reset_bufferusage(cn); /* source in only */ connreset_credits(cn); @@ -1369,11 +1348,11 @@ void reset_conn(struct conn *cn) int isreset2; if (cn->is_client) { - mutex_lock(&(cn->rcv_lock)); - mutex_lock(&(cn->reversedir->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); + spin_lock_bh(&(cn->reversedir->rcv_lock)); } else { - mutex_lock(&(cn->reversedir->rcv_lock)); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->reversedir->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); } BUG_ON(cn->isreset <= 1 && cn->reversedir->isreset >= 2); @@ -1388,11 +1367,11 @@ void reset_conn(struct conn *cn) cn->reversedir->isreset = 2; if (cn->is_client) { - mutex_unlock(&(cn->rcv_lock)); - mutex_unlock(&(cn->reversedir->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->reversedir->rcv_lock)); } else { - mutex_unlock(&(cn->reversedir->rcv_lock)); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->reversedir->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); } if (isreset1 >= 2) @@ -1448,12 +1427,6 @@ static int __init cor_common_init(void) offsetof(struct connid_reuse_item, htab_entry), offsetof(struct connid_reuse_item, ref)); - - bindnode_slab = kmem_cache_create("cor_bindnode", - sizeof(struct bindnode), 8, 0, 0); - connlistener_slab = kmem_cache_create("cor_connlistener", - sizeof(struct connlistener), 8, 0, 0); - atomic_set(&num_conns, 0); credits_init(); diff --git a/net/cor/cor.h b/net/cor/cor.h index 87b7ae0637d..387b6c64682 100644 --- a/net/cor/cor.h +++ b/net/cor/cor.h @@ -401,11 +401,11 @@ struct neighbor{ __u16 addrlen; struct timer_list cmsg_timer; - struct work_struct cmsg_work; - atomic_t cmsg_work_scheduled; + struct tasklet_struct cmsg_task; + atomic_t cmsg_task_scheduled; atomic_t cmsg_timer_running; - struct mutex cmsg_lock; - struct mutex send_cmsg_lock; + spinlock_t cmsg_lock; + spinlock_t send_cmsg_lock; struct list_head control_msgs_out; /** * urgent messages; These are sent even if the neighbor state is not @@ -461,7 +461,6 @@ struct neighbor{ __u16 connid_reuse_pingcnt; atomic_t kpacket_seqno; - atomic_t ooo_packets; spinlock_t credits_lock; ktime_t ktime_credit_update; @@ -488,8 +487,10 @@ struct neighbor{ * add_timer(struct timer_list * timer); */ spinlock_t retrans_lock; - struct delayed_work retrans_timer_conn; - struct delayed_work retrans_timer; + struct timer_list retrans_timer_conn; + struct timer_list retrans_timer; + struct tasklet_struct retrans_task_conn; + struct tasklet_struct retrans_task; __u8 retrans_timer_conn_running; __u8 retrans_timer_running; @@ -518,34 +519,6 @@ struct data_buf_item{ __u8 type; }; -struct connlistener; - -struct bindnode{ - struct list_head lh; - struct connlistener *owner; - __be64 port; -}; - -#define SOCKSTATE_LISTENER 1 -#define SOCKSTATE_CONN 2 - -struct sock_hdr { - /* The first member of connlistener/conn (see sock.c) */ - __u8 sockstate; -}; - -struct connlistener { - /* The first member has to be the same as in conn (see sock.c) */ - __u8 sockstate; - struct bindnode *bn; - struct mutex lock; - int queue_maxlen; - int queue_len; - struct list_head conn_queue; - wait_queue_head_t wait; - -}; - struct speedtracker{ __u64 speed;/* bytes*65536/jiffie */ unsigned long jiffies_last_update; @@ -649,7 +622,7 @@ struct conn{ struct kref ref; - struct mutex rcv_lock; + spinlock_t rcv_lock; ktime_t ktime_credit_update; struct list_head credit_list; @@ -677,7 +650,6 @@ struct conn{ struct connid_reuse_item *cir; __u32 conn_id; __u32 next_seqno; - __u32 ooo_packets; /* number of ack sent, not data seqno */ __u32 ack_seqno; @@ -714,7 +686,6 @@ struct conn{ struct{ struct list_head cl_list; wait_queue_head_t wait; - struct socket *sock; int flags; __u32 crate; @@ -802,19 +773,51 @@ struct skb_procstate{ union{ struct{ struct work_struct work; - }rcv; + }announce1; struct{ __u32 offset; - }announce; + }announce2; struct{ __u32 seqno; struct data_buf_item dbi; - }rcv2; + }rcv; }funcstate; }; +#define SOCKTYPE_UNCONNECTED 0 +#define SOCKTYPE_LISTENER 1 +#define SOCKTYPE_CONN 2 + +struct cor_sock { + __u8 type; /* type may not change once it is set to != UNCONNECTED */ + + spinlock_t lock; + + union { + struct { + struct list_head lh; + __be64 port; + int queue_maxlen; + int queue_len; + struct list_head conn_queue; + wait_queue_head_t wait; + } listener; + + + struct { + struct conn *src_sock; + struct conn *trgt_sock; + + struct mutex rcvbuf_lock; + struct data_buf_item *rcvitem; + __u16 rcvoffset; + + }conn; + }data; +}; + struct sock_buffertracker { struct list_head lh; @@ -877,9 +880,9 @@ extern void conn_init_sock_source(struct conn *cn); extern void conn_init_sock_target(struct conn *cn); -extern void close_port(struct connlistener *listener); +extern void close_port(struct cor_sock *cs); -extern struct connlistener *open_port(__be64 port); +extern int open_port(struct cor_sock *cs_l, __be64 port); extern int connect_port(struct conn *trtg_unconn_l, __be64 port); @@ -939,7 +942,7 @@ extern int get_next_ping_time(struct neighbor *nb); extern int force_ping(struct neighbor *nb); -extern void rcv_announce(struct sk_buff *skb); +extern int rcv_announce(struct sk_buff *skb); extern int send_announce_qos(struct announce_data *ann); @@ -969,8 +972,6 @@ extern void kernel_packet(struct neighbor *nb, struct sk_buff *skb, __u32 seqno); /* kpacket_gen.c */ -extern void controlmsg_workfunc(struct work_struct *work); - struct control_msg_out; #define ACM_PRIORITY_LOW 1 /* oom recovery easy */ @@ -984,13 +985,15 @@ extern struct control_msg_out *alloc_control_msg(struct neighbor *nb, extern void free_control_msg(struct control_msg_out *cm); -extern void retransmit_timerfunc(struct work_struct *work); +extern void retransmit_taskfunc(unsigned long arg); + +extern void retransmit_timerfunc(unsigned long arg); extern void kern_ack_rcvd(struct neighbor *nb, __u32 seqno); extern int send_messages(struct neighbor *nb, int resume); -extern void controlmsg_workfunc(struct work_struct *work); +extern void controlmsg_taskfunc(unsigned long nb); extern void controlmsg_timerfunc(unsigned long arg); @@ -1064,7 +1067,9 @@ extern struct sk_buff *create_packet(struct neighbor *nb, int size, extern void cancel_retrans(struct conn *trgt_out_l); -extern void retransmit_conn_timerfunc(struct work_struct *work); +extern void retransmit_conn_taskfunc(unsigned long nb); + +extern void retransmit_conn_timerfunc(unsigned long arg); extern void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out, __u32 seqno_ooo, __u32 length); @@ -1082,6 +1087,8 @@ extern int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte) extern int __init cor_snd_init(void); /* forward.c */ +extern struct kmem_cache *data_buf_item_slab; + extern void databuf_init(struct conn *cn_init); extern void databuf_ackdiscard(struct conn *cn_l); @@ -1090,7 +1097,7 @@ extern void reset_seqno(struct conn *cn_l, __u32 initseqno); extern void databuf_pull(struct conn *cn_l, char *dst, int len); -extern size_t databuf_pulluser(struct conn *trgt_sock_l, struct msghdr *msg); +extern void databuf_pull_dbi(struct cor_sock *cs_rl, struct conn *trgt_sock_l); extern void databuf_unpull(struct conn *trgt_out_l, __u32 bytes); @@ -1101,8 +1108,8 @@ extern void databuf_ack(struct conn *trgt_out_l, __u32 pos); extern void databuf_ackread(struct conn *cn_l); -__s64 receive_userbuf(struct conn *src_sock_l, struct msghdr *msg, __u32 maxcpy, - __u32 maxusage); +extern __s64 receive_buf(struct conn *cn_l, char *buf, __u32 datalen, + __u32 buflen, int forcecpy); extern void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len); @@ -1115,8 +1122,6 @@ extern void flush_buf(struct conn *cn); extern void __init forward_init(void); /* sock.c */ -extern struct mutex sock_bufferlimits_lock; - extern void connreset_sbt(struct conn *cn); extern void unreserve_sock_buffer(struct conn *cn); @@ -1213,6 +1218,31 @@ static inline void set_last_act(struct conn *src_in_l) _set_last_act(src_in_l); } +#define PAGESIZE (1 << PAGE_SHIFT) + +static inline __u32 buf_optlen(__u32 datalen) +{ + __u32 optlen = 128; + while (optlen < datalen && optlen < PAGESIZE && optlen < 16384) + optlen = (optlen << 1); + return optlen; +} + + +inline static void databuf_item_free(struct data_buf_item *item) +{ + if (item->type == TYPE_BUF) { + kfree(item->buf); + kmem_cache_free(data_buf_item_slab, item); + } else if (item->type == TYPE_SKB) { + struct sk_buff *skb = skb_from_pstate(container_of(item, + struct skb_procstate, funcstate.rcv.dbi)); + kfree_skb(skb); + } else { + BUG(); + } +} + static inline int seqno_before(__u32 seqno1, __u32 seqno2) { diff --git a/net/cor/cpacket_parse.c b/net/cor/cpacket_parse.c index 2c3b0f92e4c..74166682e53 100644 --- a/net/cor/cpacket_parse.c +++ b/net/cor/cpacket_parse.c @@ -78,7 +78,7 @@ static int reserve_cpacket_buffer(struct conn *trtg_unconn_l, int fromresume) spin_unlock_irqrestore(&cpacket_bufferlimits_lock, iflags); trtg_unconn_l->target.unconnected.cmdparams = kmalloc(paramlen, - GFP_KERNEL); + GFP_ATOMIC); if (unlikely(trtg_unconn_l->target.unconnected.cmdparams == 0)) { spin_lock_irqsave(&cpacket_bufferlimits_lock, iflags); cpacket_kmallocfailed = 1; @@ -96,9 +96,9 @@ static int reserve_cpacket_buffer(struct conn *trtg_unconn_l, int fromresume) return 1; } - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); trtg_unconn_l->reversedir->data_buf.cpacket_buffer += resplen; - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); if (putconn) kref_put(&(trtg_unconn_l->ref), free_conn); @@ -133,7 +133,7 @@ static void cpacket_buffer_resume(struct work_struct *work) spin_unlock_irqrestore(&cpacket_bufferlimits_lock, iflags); - mutex_lock(&(trtg_unconn->rcv_lock)); + spin_lock_bh(&(trtg_unconn->rcv_lock)); if (unlikely(trtg_unconn->targettype != TARGET_UNCONNECTED)) { doparse = 0; @@ -155,7 +155,7 @@ static void cpacket_buffer_resume(struct work_struct *work) spin_unlock_irqrestore(&cpacket_bufferlimits_lock, iflags); unlock: - mutex_unlock(&(trtg_unconn->rcv_lock)); + spin_unlock_bh(&(trtg_unconn->rcv_lock)); if (likely(doparse)) parse(trtg_unconn, 1); @@ -273,9 +273,9 @@ static void send_resp_ok(struct conn *trtg_unconn_l) { __u8 respcode = CDR_EXECOK; - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); receive_cpacketresp(trtg_unconn_l->reversedir, (char *) &respcode, 1); - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); } static char *get_error_reason_text(__u16 reasoncode) @@ -322,10 +322,10 @@ static void send_resp_failed(struct conn *trtg_unconn_l, __u16 reasoncode) reasonlen_len = encode_len(hdr + 3, 4, reasonlen); BUG_ON(reasonlen_len <= 0); - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); receive_cpacketresp(trtg_unconn_l->reversedir, hdr, 3 + reasonlen_len); receive_cpacketresp(trtg_unconn_l->reversedir, reasontext, reasonlen); - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); } static void send_resp_bin(struct conn *trtg_unconn_l, char *buf, __u32 len) @@ -338,10 +338,10 @@ static void send_resp_bin(struct conn *trtg_unconn_l, char *buf, __u32 len) BUG_ON(len_len <= 0); - mutex_lock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); receive_cpacketresp(trtg_unconn_l->reversedir, hdr, len_len + 1); receive_cpacketresp(trtg_unconn_l->reversedir, buf, len); - mutex_unlock(&(trtg_unconn_l->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn_l->reversedir->rcv_lock)); } static void parse_set_tos(struct conn *trtg_unconn_l) @@ -392,7 +392,7 @@ static void parse_list_neigh(struct conn *trtg_unconn_l) read += rc; - buf = kmalloc(LISTNEIGH_RESP_MAXSIZE, GFP_KERNEL); + buf = kmalloc(LISTNEIGH_RESP_MAXSIZE, GFP_ATOMIC); if (unlikely(buf == 0)) { send_resp_failed(trtg_unconn_l, @@ -598,7 +598,7 @@ static void read_hdr(struct conn *trtg_unconn_l) void parse(struct conn *trtg_unconn, int fromresume) { start: - mutex_lock(&(trtg_unconn->rcv_lock)); + spin_lock_bh(&(trtg_unconn->rcv_lock)); if (unlikely(trtg_unconn->isreset != 0)) goto out; @@ -643,17 +643,17 @@ start: int flush = 0; BUG_ON(trtg_unconn->is_client == 0); - mutex_lock(&(trtg_unconn->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn->reversedir->rcv_lock)); cpacket_buffer = trtg_unconn->reversedir->data_buf.cpacket_buffer; - mutex_unlock(&(trtg_unconn->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn->reversedir->rcv_lock)); parse_cmd(trtg_unconn); kfree(cmd); - mutex_lock(&(trtg_unconn->reversedir->rcv_lock)); + spin_lock_bh(&(trtg_unconn->reversedir->rcv_lock)); trtg_unconn->reversedir->data_buf.cpacket_buffer = trtg_unconn->reversedir->data_buf.totalsize + trtg_unconn->reversedir->data_buf.overhead; cpacket_buffer -= trtg_unconn->reversedir->data_buf.cpacket_buffer; - mutex_unlock(&(trtg_unconn->reversedir->rcv_lock)); + spin_unlock_bh(&(trtg_unconn->reversedir->rcv_lock)); free_cpacket_buffer(paramlen + cpacket_buffer); if (trtg_unconn->targettype != TARGET_UNCONNECTED) { @@ -666,7 +666,7 @@ start: trtg_unconn->target.unconnected.paramlen = 0; } - mutex_unlock(&(trtg_unconn->rcv_lock)); + spin_unlock_bh(&(trtg_unconn->rcv_lock)); if (flush) flush_buf(trtg_unconn); flush_buf(trtg_unconn->reversedir); @@ -676,7 +676,7 @@ start: } out: - mutex_unlock(&(trtg_unconn->rcv_lock)); + spin_unlock_bh(&(trtg_unconn->rcv_lock)); } int __init cor_cpacket_init(void) diff --git a/net/cor/credits.c b/net/cor/credits.c index 1e08bc49930..b12c87b1219 100644 --- a/net/cor/credits.c +++ b/net/cor/credits.c @@ -404,29 +404,29 @@ crates: static void credits_unlock_conn(struct conn *cn, __u32 hints) { if ((hints & 1) != 0) - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); if ((hints & 2) != 0) - mutex_unlock(&(cn->reversedir->rcv_lock)); + spin_unlock_bh(&(cn->reversedir->rcv_lock)); if ((hints & 4) != 0) - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); } static __u32 credits_lock_conn(struct conn *cn) { - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); if (cn->sourcetype == SOURCE_IN && cn->targettype == TARGET_OUT) { if (likely(cn->target.out.conn_id != 0)) return 1; } if (cn->is_client) { - mutex_lock(&(cn->reversedir->rcv_lock)); + spin_lock_bh(&(cn->reversedir->rcv_lock)); return 6; } else { - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); - mutex_lock(&(cn->reversedir->rcv_lock)); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->reversedir->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); return 3; } @@ -613,11 +613,14 @@ static void background_refresh_credits(struct work_struct *work) } else { cn = container_of(credit_refresh_conns.next, struct conn, credit_list); + kref_get(&(cn->ref)); } spin_unlock_irqrestore(&credits_list_lock, iflags); - if (cn != 0) + if (cn != 0) { rc = refresh_conn_credits(cn, 1, 0); + kref_put(&(cn->ref), free_conn); + } } if (likely(rc > 0)) { diff --git a/net/cor/forward.c b/net/cor/forward.c index bbee4b4fcf6..1ebf939330f 100644 --- a/net/cor/forward.c +++ b/net/cor/forward.c @@ -24,27 +24,20 @@ struct kmem_cache *data_buf_item_slab; -#define PAGESIZE (1 << PAGE_SHIFT) - void databuf_init(struct conn *cn_init) { memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf)); INIT_LIST_HEAD(&(cn_init->data_buf.items)); } -static void databuf_item_free(struct conn *cn_l, struct data_buf_item *item) +static inline void databuf_item_unlink(struct conn *cn_l, struct data_buf_item *item) { list_del(&(item->buf_list)); if (item->type == TYPE_BUF) { cn_l->data_buf.overhead -= sizeof(struct data_buf_item) + item->buflen - item->datalen; - kfree(item->buf); - kmem_cache_free(data_buf_item_slab, item); } else if (item->type == TYPE_SKB) { - struct sk_buff *skb = skb_from_pstate(container_of(item, - struct skb_procstate, funcstate.rcv2.dbi)); cn_l->data_buf.overhead -= sizeof(struct sk_buff); - kfree_skb(skb); } else { BUG(); } @@ -59,7 +52,8 @@ void databuf_ackdiscard(struct conn *cn_l) struct data_buf_item, buf_list); freed += item->datalen; - databuf_item_free(cn_l, item); + databuf_item_unlink(cn_l, item); + databuf_item_free(item); } cn_l->data_buf.totalsize -= freed; @@ -106,17 +100,14 @@ static void databuf_nextreadchunk(struct conn *cn_l) } } -static int _databuf_pull(struct conn *cn_l, char *dst, int len, int userbuf) +void databuf_pull(struct conn *cn_l, char *dst, int len) { - int totalcpy = 0; - BUG_ON(cn_l->data_buf.read_remaining < len); if (cn_l->data_buf.lastread == 0) databuf_nextreadchunk(cn_l); while(len > 0) { - int rc = 0; int cpy = len; char *srcbufcpystart = 0; @@ -132,81 +123,59 @@ static int _databuf_pull(struct conn *cn_l, char *dst, int len, int userbuf) if (cpy > srcbufcpylen) cpy = srcbufcpylen; - if (userbuf) { - int notcopied = copy_to_user(dst, srcbufcpystart, cpy); - cpy -= notcopied; - if (unlikely(notcopied > 0)) - rc = -EFAULT; - } else { - memcpy(dst, srcbufcpystart, cpy); - } + memcpy(dst, srcbufcpystart, cpy); dst += cpy; len -= cpy; - totalcpy += cpy; cn_l->data_buf.read_remaining -= cpy; cn_l->data_buf.last_read_offset += cpy; if (cpy == srcbufcpylen) databuf_nextreadchunk(cn_l); - - if (unlikely(rc < 0)) { - if (totalcpy == 0) - totalcpy = rc; - break; - } } - - return totalcpy; } -void databuf_pull(struct conn *cn_l, char *dst, int len) +void databuf_pull_dbi(struct cor_sock *cs_rl, struct conn *trgt_sock_l) { - _databuf_pull(cn_l, dst, len, 0); -} + struct data_buf_item *dbi = 0; + BUG_ON(cs_rl->type != SOCKTYPE_CONN); + BUG_ON(cs_rl->data.conn.rcvitem != 0); -size_t databuf_pulluser(struct conn *trgt_sock_l, struct msghdr *msg) -{ - size_t copied = 0; - int iovidx = 0; - int iovread = 0; + if (trgt_sock_l->data_buf.read_remaining == 0) + return; - while (iovidx < msg->msg_iovlen) { - int rc; + if (trgt_sock_l->data_buf.lastread == 0) + databuf_nextreadchunk(trgt_sock_l); - struct iovec *iov = msg->msg_iov + iovidx; - __user char *msg = iov->iov_base + iovread; - unsigned int len = iov->iov_len - iovread; + dbi = trgt_sock_l->data_buf.lastread; - if (len == 0) { - iovidx++; - iovread = 0; - continue; - } + BUG_ON(dbi == 0); - if (trgt_sock_l->data_buf.read_remaining == 0) { - rc = -EAGAIN; - } else { - if (len > trgt_sock_l->data_buf.read_remaining) - len = trgt_sock_l->data_buf.read_remaining; + BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next); - rc = _databuf_pull(trgt_sock_l, msg, len, 1); - } + cs_rl->data.conn.rcvitem = dbi; + cs_rl->data.conn.rcvoffset = trgt_sock_l->data_buf.last_read_offset; - BUG_ON(rc == 0); + trgt_sock_l->data_buf.first_offset += dbi->datalen; + trgt_sock_l->data_buf.totalsize -= dbi->datalen; + trgt_sock_l->data_buf.read_remaining -= dbi->datalen; - if (rc < 0) { - if (copied == 0) - copied = rc; - break; - } - - copied += rc; - iovread += rc; + if (unlikely(trgt_sock_l->data_buf.cpacket_buffer != 0)) { + __u32 amount = dbi->datalen < + trgt_sock_l->data_buf.cpacket_buffer ? + dbi->datalen : + trgt_sock_l->data_buf.cpacket_buffer; + free_cpacket_buffer(amount); + trgt_sock_l->data_buf.cpacket_buffer -= amount; } - return copied; + databuf_item_unlink(trgt_sock_l, dbi); + + trgt_sock_l->data_buf.lastread = 0; + trgt_sock_l->data_buf.last_read_offset = 0; + + /* databuf_item_free(firstitem); */ } void databuf_unpull(struct conn *trgt_out_l, __u32 bytes) @@ -294,7 +263,8 @@ void databuf_ack(struct conn *trgt_out_l, __u32 pos) trgt_out_l->data_buf.first_offset += firstitem->datalen; acked += firstitem->datalen; - databuf_item_free(trgt_out_l, firstitem); + databuf_item_unlink(trgt_out_l, firstitem); + databuf_item_free(firstitem); } trgt_out_l->data_buf.totalsize -= acked; @@ -303,7 +273,7 @@ void databuf_ack(struct conn *trgt_out_l, __u32 pos) trgt_out_l->data_buf.overhead != 0); if (unlikely(trgt_out_l->data_buf.cpacket_buffer != 0)) { - __u32 amount = acked > trgt_out_l->data_buf.cpacket_buffer ? + __u32 amount = acked < trgt_out_l->data_buf.cpacket_buffer ? acked : trgt_out_l->data_buf.cpacket_buffer; free_cpacket_buffer(amount); trgt_out_l->data_buf.cpacket_buffer -= amount; @@ -328,7 +298,8 @@ void databuf_ackread(struct conn *cn_l) acked += firstitem->datalen; - databuf_item_free(cn_l, firstitem); + databuf_item_unlink(cn_l, firstitem); + databuf_item_free(firstitem); } cn_l->data_buf.first_offset += acked; @@ -337,7 +308,7 @@ void databuf_ackread(struct conn *cn_l) BUG_ON(cn_l->data_buf.totalsize == 0 && cn_l->data_buf.overhead != 0); if (unlikely(cn_l->data_buf.cpacket_buffer != 0)) { - __u32 amount = acked > cn_l->data_buf.cpacket_buffer ? + __u32 amount = acked < cn_l->data_buf.cpacket_buffer ? acked : cn_l->data_buf.cpacket_buffer; free_cpacket_buffer(amount); cn_l->data_buf.cpacket_buffer -= amount; @@ -347,9 +318,10 @@ void databuf_ackread(struct conn *cn_l) refresh_speedstat(cn_l, acked); } -static __s64 _receive_buf(struct conn *cn_l, char *buf, __u32 len, int userbuf, - __u32 maxcpy, __u32 maxusage) +__s64 receive_buf(struct conn *cn_l, char *buf, __u32 datalen, __u32 buflen, + int forcecpy) { + char *freewhenfinished = 0; struct data_buf_item *item = 0; __s64 totalcpy = 0; @@ -359,99 +331,85 @@ static __s64 _receive_buf(struct conn *cn_l, char *buf, __u32 len, int userbuf, item = container_of(last, struct data_buf_item, buf_list); } - while (len > 0) { + while (datalen > 0) { int rc = 0; - int cpy = len; - - if (item == 0 || item->buflen <= item->datalen) { - __u32 buflen = len; - - if (maxusage != 0) { - if (cn_l->data_buf.totalsize + - cn_l->data_buf.overhead > - maxusage) { - rc = -EAGAIN; - goto error; - } - - buflen = maxusage - cn_l->data_buf.totalsize - - cn_l->data_buf.overhead - - sizeof(struct data_buf_item); - } else { - if (totalcpy + 64 > maxcpy && - totalcpy + len > maxcpy) { - rc = -EAGAIN; - goto error; - } + int cpy = datalen; - if (totalcpy + buflen < maxcpy) - buflen = maxcpy - totalcpy; - } + #warning todo convert to bugon and do check on caller + if (unlikely(unlikely(cn_l->data_buf.totalsize + datalen > + (1 << 30)) || + unlikely(cn_l->data_buf.overhead > (1<< 30)))) { + rc = -EAGAIN; + goto error; + } - if (buflen < 64) - buflen = 64; - if (buflen > PAGESIZE) - buflen = PAGESIZE; - if (buflen > 32768) - buflen = 32768; - - if (unlikely(unlikely(cn_l->data_buf.totalsize + - buflen > (1 << 30)) || unlikely( - cn_l->data_buf.overhead > (1 << 30)))) { - rc = -EAGAIN; - goto error; - } + if (forcecpy == 0 && item != 0 && + datalen < (item->buflen - item->datalen) && + datalen*2 < (buflen + + sizeof(struct data_buf_item))) { + forcecpy = 1; + freewhenfinished = buf; + } - item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL); + if (forcecpy == 0 || item == 0 || + item->buflen <= item->datalen) { + item = kmem_cache_alloc(data_buf_item_slab, GFP_ATOMIC); if (unlikely(item == 0)) { rc = -ENOMEM; goto error; } + memset(item, 0, sizeof(item)); item->type = TYPE_BUF; - item->buf = kmalloc(buflen, GFP_KERNEL); - if (unlikely(item->buf == 0)) { - kmem_cache_free(data_buf_item_slab, item); - rc = -ENOMEM; - goto error; + if (forcecpy == 0) { + item->buf = buf; + item->datalen = datalen; + item->buflen = buflen; + } else { + buflen = buf_optlen(datalen); + item->buf = kmalloc(buflen, GFP_ATOMIC); + + if (unlikely(item->buf == 0)) { + kmem_cache_free(data_buf_item_slab, + item); + rc = -ENOMEM; + goto error; + } + item->datalen = 0; + item->buflen = buflen; } - item->datalen = 0; list_add_tail(&(item->buf_list), &(cn_l->data_buf.items)); - item->buflen = buflen; - cn_l->data_buf.overhead += buflen + + + cn_l->data_buf.overhead += item->buflen + sizeof(struct data_buf_item); } - BUG_ON(item->type != TYPE_BUF); - BUG_ON(item->buflen <= item->datalen); + if (forcecpy == 0) { + cpy = item->datalen; + } else { + BUG_ON(item->type != TYPE_BUF); + BUG_ON(item->buflen <= item->datalen); - if (item->buflen - item->datalen < cpy) - cpy = (item->buflen - item->datalen); + if (item->buflen - item->datalen < cpy) + cpy = (item->buflen - item->datalen); - if (userbuf) { - int notcopied = copy_from_user(item->buf + - item->datalen, buf, cpy); - cpy -= notcopied; - if (unlikely(notcopied > 0)) - rc = -EFAULT; - } else { memcpy(item->buf + item->datalen, buf, cpy); + item->datalen += cpy; } buf += cpy; - len -= cpy; + datalen -= cpy; + cn_l->data_buf.read_remaining += cpy; cn_l->data_buf.totalsize += cpy; cn_l->data_buf.overhead -= cpy; - BUG_ON(cn_l->data_buf.totalsize == 0 && - cn_l->data_buf.overhead != 0); + BUG_ON(cn_l->data_buf.totalsize != 0 && + cn_l->data_buf.overhead == 0); totalcpy += cpy; - item->datalen += cpy; - error: if (unlikely(rc < 0)) { if (totalcpy == 0) @@ -460,47 +418,10 @@ error: } } - return totalcpy; -} - -__s64 receive_userbuf(struct conn *src_sock_l, struct msghdr *msg, __u32 maxcpy, - __u32 maxusage) -{ - __s64 copied = 0; - int iovidx = 0; - int iovread = 0; - - while (iovidx < msg->msg_iovlen) { - struct iovec *iov = msg->msg_iov + iovidx; - __user char *userbuf = iov->iov_base + iovread; - __u32 len = iov->iov_len - iovread; - __s64 rc; - - if (len == 0) { - iovidx++; - iovread = 0; - continue; - } - - BUG_ON(copied < 0); - BUG_ON(copied > maxcpy); - rc = _receive_buf(src_sock_l, userbuf, len, 1, maxcpy - copied, - maxusage); - - if (rc < 0) { - if (copied == 0) - copied = rc; - break; - } - - copied += rc; - iovread += rc; - - if (rc < len) - break; - } + if (freewhenfinished != 0) + kfree(freewhenfinished); - return copied; + return totalcpy; } void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len) @@ -508,7 +429,7 @@ void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len) __s64 rc; BUG_ON(trtg_unconn_l->data_buf.cpacket_buffer < trtg_unconn_l->data_buf.totalsize + len); - rc = _receive_buf(trtg_unconn_l, buf, len, 0, len, 0); + rc = receive_buf(trtg_unconn_l, buf, len, len, 1); BUG_ON(rc < 0); BUG_ON(rc < len); } @@ -516,7 +437,7 @@ void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len) int receive_skb(struct conn *src_in_l, struct sk_buff *skb) { struct skb_procstate *ps = skb_pstate(skb); - struct data_buf_item *item = &(ps->funcstate.rcv2.dbi); + struct data_buf_item *item = &(ps->funcstate.rcv.dbi); if (unlikely(unlikely(src_in_l->data_buf.totalsize + skb->len > (1 << 30)) || unlikely(src_in_l->data_buf.overhead > @@ -536,29 +457,24 @@ int receive_skb(struct conn *src_in_l, struct sk_buff *skb) return 0; } -static void _wake_sender_in(struct conn *src_in_l) -{ - drain_ooo_queue(src_in_l); - mutex_unlock(&(src_in_l->rcv_lock)); - get_window(src_in_l, 0, 0, 0); -} - void wake_sender(struct conn *cn) { unreserve_sock_buffer(cn); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); switch (cn->sourcetype) { case SOURCE_NONE: - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); parse(cn->reversedir, 0); break; case SOURCE_SOCK: wake_up_interruptible(&(cn->source.sock.wait)); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); break; case SOURCE_IN: - _wake_sender_in(cn); /* mutex_unlock inside */ + drain_ooo_queue(cn); + spin_unlock_bh(&(cn->rcv_lock)); + get_window(cn, 0, 0, 0); break; default: BUG(); @@ -569,11 +485,11 @@ void flush_buf(struct conn *cn) { int rc; int sent = 0; - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); switch (cn->targettype) { case TARGET_UNCONNECTED: - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); parse(cn, 0); break; case TARGET_SOCK: @@ -584,16 +500,16 @@ void flush_buf(struct conn *cn) cn->data_buf.cpacket_buffer >= BUFFERLIMIT_SOCK_SOCK/2) wake_up_interruptible(&(cn->target.sock.wait)); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); break; case TARGET_OUT: rc = flush_out(cn, 0, 0); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); sent = (rc == RC_FLUSH_CONN_OUT_OK_SENT); break; case TARGET_DISCARD: databuf_ackdiscard(cn); - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); sent = 1; break; default: diff --git a/net/cor/kpacket_gen.c b/net/cor/kpacket_gen.c index 3892b729bed..4b2199a4e1b 100644 --- a/net/cor/kpacket_gen.c +++ b/net/cor/kpacket_gen.c @@ -176,7 +176,7 @@ static inline int isurgent(struct control_msg_out *cm) static struct control_msg_out *__alloc_control_msg(void) { struct control_msg_out *cm = kmem_cache_alloc(controlmsg_slab, - GFP_KERNEL); + GFP_ATOMIC); if (unlikely(cm == 0)) return 0; memset(cm, 0, sizeof(struct control_msg_out)); @@ -280,16 +280,16 @@ void free_control_msg(struct control_msg_out *cm) if (cm->type == MSGTYPE_ACK_CONN) { struct conn *trgt_out = cm->msg.ack_conn.src_in->reversedir; BUG_ON(cm->msg.ack_conn.src_in == 0); - mutex_lock(&(trgt_out->rcv_lock)); + spin_lock_bh(&(trgt_out->rcv_lock)); BUG_ON(trgt_out->targettype != TARGET_OUT); if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_CREDITS) != 0 && trgt_out->target.out.decaytime_send_allowed != 0) { trgt_out->target.out.decaytime_send_allowed = 1; - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); refresh_conn_credits(trgt_out, 0, 0); } else { - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); } kref_put(&(cm->msg.ack_conn.src_in->ref), free_conn); @@ -385,27 +385,25 @@ static void readd_control_retrans(struct control_retrans *cr) list_del(&(cm->lh)); if (cm->type == MSGTYPE_ACK_CONN) { struct conn *cn_l = cm->msg.ack_conn.src_in; - mutex_lock(&(cn_l->rcv_lock)); + spin_lock_bh(&(cn_l->rcv_lock)); if (unlikely(ackconn_prepare_readd(cn_l, cm) == 0)) { free_control_msg(cm); } else { mergeadd_ackconn(cn_l, cm); } - mutex_unlock(&(cn_l->rcv_lock)); + spin_unlock_bh(&(cn_l->rcv_lock)); } else { add_control_msg(cm, 1); } } } -void retransmit_timerfunc(struct work_struct *work) +void retransmit_taskfunc(unsigned long arg) { + struct neighbor *nb = (struct neighbor *) arg; unsigned long iflags; - struct neighbor *nb = container_of(to_delayed_work(work), - struct neighbor, retrans_timer); - int nbstate; int nbput = 0; @@ -417,7 +415,7 @@ void retransmit_timerfunc(struct work_struct *work) struct control_retrans *cr = 0; struct retransmit_matchparam rm; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); if (list_empty(&(nb->retrans_list))) { nb->retrans_timer_running = 0; @@ -436,7 +434,7 @@ void retransmit_timerfunc(struct work_struct *work) list_del(&(cr->timeout_list)); if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) { - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); htable_delete(&retransmits, rm_to_key(&rm), &rm, free_control_retrans); @@ -446,8 +444,7 @@ void retransmit_timerfunc(struct work_struct *work) if (time_after(cr->timeout, jiffies)) { list_add(&(cr->timeout_list), &(nb->retrans_list)); - schedule_delayed_work(&(nb->retrans_timer), - cr->timeout - jiffies); + mod_timer(&(nb->retrans_timer_conn), cr->timeout); break; } @@ -455,23 +452,27 @@ void retransmit_timerfunc(struct work_struct *work) free_control_retrans))) BUG(); - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); readd_control_retrans(cr); kref_put(&(cr->ref), free_control_retrans); } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); if (nbput) kref_put(&(nb->ref), neighbor_free); } -static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb) +void retransmit_timerfunc(unsigned long arg) { - unsigned long iflags; + struct neighbor *nb = (struct neighbor *) arg; + tasklet_schedule(&(nb->retrans_task)); +} +static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb) +{ struct retransmit_matchparam rm; int first; @@ -480,32 +481,29 @@ static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb) set_retrans_timeout(cr, nb); - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); htable_insert(&retransmits, (char *) cr, rm_to_key(&rm)); first = list_empty(&(nb->retrans_list)); list_add_tail(&(cr->timeout_list), &(nb->retrans_list)); if (first && nb->retrans_timer_running == 0) { - schedule_delayed_work(&(nb->retrans_timer), - cr->timeout - jiffies); + mod_timer(&(nb->retrans_timer), cr->timeout); nb->retrans_timer_running = 1; kref_get(&(nb->ref)); } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); } void kern_ack_rcvd(struct neighbor *nb, __u32 seqno) { - unsigned long iflags; - struct control_retrans *cr = 0; struct retransmit_matchparam rm; rm.seqno = seqno; rm.nb = nb; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); cr = (struct control_retrans *) htable_get(&retransmits, rm_to_key(&rm), &rm); @@ -524,7 +522,7 @@ void kern_ack_rcvd(struct neighbor *nb, __u32 seqno) list_del(&(cr->timeout_list)); out: - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); if (cr != 0) { kref_put(&(cr->ref), free_control_retrans); /* htable_get */ @@ -685,7 +683,7 @@ static int add_pong(struct sk_buff *skb, struct control_retrans *cr, static __u16 get_credits(struct conn *sconn) { __u16 ret; - mutex_lock(&(sconn->reversedir->rcv_lock)); + spin_lock_bh(&(sconn->reversedir->rcv_lock)); BUG_ON(sconn->reversedir->targettype != TARGET_OUT); BUG_ON(sconn->reversedir->target.out.decaytime_last >= 1024); @@ -693,7 +691,7 @@ static __u16 get_credits(struct conn *sconn) ret = sconn->reversedir->target.out.decaytime_last + ( sconn->reversedir->target.out.decaytime_seqno << 10); - mutex_unlock(&(sconn->reversedir->rcv_lock)); + spin_unlock_bh(&(sconn->reversedir->rcv_lock)); return ret; } @@ -878,11 +876,11 @@ static void requeue_message(struct control_msg_out *cm) if (cm->type == MSGTYPE_ACK_CONN) { struct conn *cn_l = cm->msg.ack_conn.src_in; - mutex_lock(&(cn_l->rcv_lock)); + spin_lock_bh(&(cn_l->rcv_lock)); if (unlikely(ackconn_prepare_readd(cn_l, cm) == 0)) { free_control_msg(cm); } else { - mutex_lock(&(cm->nb->cmsg_lock)); + spin_lock_bh(&(cm->nb->cmsg_lock)); list_add(&(cm->lh), &(cm->nb->control_msgs_out)); cm->nb->cmlength += cm->length; @@ -891,9 +889,9 @@ static void requeue_message(struct control_msg_out *cm) &(cn_l->source.in.acks_pending)); try_merge_ackconns(cn_l, cm); - mutex_unlock(&(cm->nb->cmsg_lock)); + spin_unlock_bh(&(cm->nb->cmsg_lock)); } - mutex_unlock(&(cn_l->rcv_lock)); + spin_unlock_bh(&(cn_l->rcv_lock)); return; } @@ -944,9 +942,9 @@ static __u32 __send_messages(struct neighbor *nb, struct sk_buff *skb, int rc; struct control_msg_out *cm; - mutex_lock(&(nb->cmsg_lock)); + spin_lock_bh(&(nb->cmsg_lock)); cm = dequeue_message(nb, urgentonly); - mutex_unlock(&(nb->cmsg_lock)); + spin_unlock_bh(&(nb->cmsg_lock)); if (cm == 0) break; @@ -997,7 +995,7 @@ static int _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, struct control_msg_out *split_conndata = 0; __u32 sc_sendlen = 0; - mutex_lock(&(nb->cmsg_lock)); + spin_lock_bh(&(nb->cmsg_lock)); if (ping != 0) { int rc; @@ -1011,7 +1009,7 @@ static int _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping, if (likely(urgentonly == 0) && unlikely(nb->max_cmsg_delay_sent == 0)) length += __send_messages_smcd(nb, skb, cr, spaceleft - length); - mutex_unlock(&(nb->cmsg_lock)); + spin_unlock_bh(&(nb->cmsg_lock)); length += __send_messages(nb, skb, cr, spaceleft - length, urgentonly, &split_conndata, &sc_sendlen); @@ -1127,11 +1125,11 @@ static int reset_timeouted_conn(struct neighbor *nb, struct conn *src_in) int resetted = 0; if (src_in->is_client) { - mutex_lock(&(src_in->rcv_lock)); - mutex_lock(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_lock(&(src_in->reversedir->rcv_lock)); - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); } resetted = reset_timeouted_conn_needed(nb, src_in); @@ -1149,11 +1147,11 @@ static int reset_timeouted_conn(struct neighbor *nb, struct conn *src_in) unlock: if (src_in->is_client) { - mutex_unlock(&(src_in->rcv_lock)); - mutex_unlock(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); } if (resetted) @@ -1185,9 +1183,9 @@ static void reset_timeouted_conns(struct neighbor *nb) spin_unlock_irqrestore(&(nb->conn_list_lock), iflags); - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); resetted = reset_timeouted_conn_needed(nb, src_in); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); if (likely(resetted == 0)) goto put; @@ -1214,8 +1212,8 @@ int send_messages(struct neighbor *nb, int resume) if (likely(urgentonly == 0)) reset_timeouted_conns(nb); - mutex_lock(&(nb->send_cmsg_lock)); - mutex_lock(&(nb->cmsg_lock)); + spin_lock_bh(&(nb->send_cmsg_lock)); + spin_lock_bh(&(nb->cmsg_lock)); ping = time_to_send_ping(nb); @@ -1250,15 +1248,15 @@ int send_messages(struct neighbor *nb, int resume) if (length > targetmss) length = targetmss; - mutex_unlock(&(nb->cmsg_lock)); - skb = create_packet(nb, length, GFP_KERNEL, 0, seqno); + spin_unlock_bh(&(nb->cmsg_lock)); + skb = create_packet(nb, length, GFP_ATOMIC, 0, seqno); if (unlikely(skb == 0)) { printk(KERN_ERR "cor: send_messages: cannot allocate " "skb (out of memory?)"); goto oom; } - cr = kmem_cache_alloc(controlretrans_slab, GFP_KERNEL); + cr = kmem_cache_alloc(controlretrans_slab, GFP_ATOMIC); if (unlikely(cr == 0)) { kfree_skb(skb); printk(KERN_ERR "cor: send_messages: cannot allocate " @@ -1274,7 +1272,7 @@ int send_messages(struct neighbor *nb, int resume) rc = _send_messages(nb, skb, ping, cr, length, urgentonly); ping = 0; - mutex_lock(&(nb->cmsg_lock)); + spin_lock_bh(&(nb->cmsg_lock)); if (rc != 0) break; @@ -1282,7 +1280,7 @@ int send_messages(struct neighbor *nb, int resume) if (0) { oom: - mutex_lock(&(nb->cmsg_lock)); + spin_lock_bh(&(nb->cmsg_lock)); } if (rc != 0) { @@ -1290,12 +1288,12 @@ oom: qos_enqueue(nb->queue, &(nb->rb_kp), QOS_CALLER_KPACKET); } else { - atomic_set(&(nb->cmsg_work_scheduled), 0); + atomic_set(&(nb->cmsg_task_scheduled), 0); schedule_controlmsg_timer(nb); } - mutex_unlock(&(nb->cmsg_lock)); - mutex_unlock(&(nb->send_cmsg_lock)); + spin_unlock_bh(&(nb->cmsg_lock)); + spin_unlock_bh(&(nb->send_cmsg_lock)); if (resume == 0) kref_put(&(nb->ref), neighbor_free); @@ -1303,18 +1301,17 @@ oom: return rc; } -void controlmsg_workfunc(struct work_struct *work) +void controlmsg_taskfunc(unsigned long nb) { - struct neighbor *nb = container_of(work, struct neighbor, cmsg_work); - send_messages(nb, 0); + send_messages((struct neighbor *)nb, 0); } -static void schedule_cmsg_work(struct neighbor *nb) +static void schedule_cmsg_task(struct neighbor *nb) { - if (atomic_cmpxchg(&(nb->cmsg_work_scheduled), 0, 1) == 0) { + if (atomic_cmpxchg(&(nb->cmsg_task_scheduled), 0, 1) == 0) { kref_get(&(nb->ref)); atomic_cmpxchg(&(nb->cmsg_timer_running), 1, 2); - schedule_work(&(nb->cmsg_work)); + tasklet_schedule(&(nb->cmsg_task)); } } @@ -1327,7 +1324,7 @@ void controlmsg_timerfunc(unsigned long arg) BUG_ON(oldval == 0); if (likely(oldval == 1)) - schedule_cmsg_work(nb); + schedule_cmsg_task(nb); kref_put(&(nb->ref), neighbor_free); } @@ -1385,7 +1382,7 @@ void schedule_controlmsg_timer(struct neighbor *nb) return; } - if (unlikely(atomic_read(&(nb->cmsg_work_scheduled)) == 1)) + if (unlikely(atomic_read(&(nb->cmsg_task_scheduled)) == 1)) return; if (cmsg_full_packet(nb, state)) @@ -1395,7 +1392,7 @@ void schedule_controlmsg_timer(struct neighbor *nb) if (time_before_eq(timeout, jiffies)) { now: - schedule_cmsg_work(nb); + schedule_cmsg_task(nb); } else { if (atomic_xchg(&(nb->cmsg_timer_running), 1) == 0) kref_get(&(nb->ref)); @@ -1432,7 +1429,7 @@ static void add_control_msg(struct control_msg_out *cm, int retrans) cm->timeout = jiffies + msecs_to_jiffies(CMSG_INTERVAL_MS); - mutex_lock(&(cm->nb->cmsg_lock)); + spin_lock_bh(&(cm->nb->cmsg_lock)); if (isurgent(cm)) { long msgs; @@ -1476,7 +1473,7 @@ static void add_control_msg(struct control_msg_out *cm, int retrans) schedule_controlmsg_timer(cm->nb); out: - mutex_unlock(&(cm->nb->cmsg_lock)); + spin_unlock_bh(&(cm->nb->cmsg_lock)); } @@ -1690,7 +1687,7 @@ static void mergeadd_ackconn(struct conn *src_in_l, struct control_msg_out *cm) BUG_ON(src_in_l->sourcetype != SOURCE_IN); - mutex_lock(&(cm->nb->cmsg_lock)); + spin_lock_bh(&(cm->nb->cmsg_lock)); currlh = src_in_l->source.in.acks_pending.next; @@ -1708,7 +1705,7 @@ static void mergeadd_ackconn(struct conn *src_in_l, struct control_msg_out *cm) if (_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) { try_merge_ackconns(src_in_l, currcm); schedule_controlmsg_timer(currcm->nb); - mutex_unlock(&(currcm->nb->cmsg_lock)); + spin_unlock_bh(&(currcm->nb->cmsg_lock)); return; } @@ -1718,7 +1715,7 @@ static void mergeadd_ackconn(struct conn *src_in_l, struct control_msg_out *cm) list_add_tail(&(cm->msg.ack_conn.conn_acks), &(src_in_l->source.in.acks_pending)); - mutex_unlock(&(cm->nb->cmsg_lock)); + spin_unlock_bh(&(cm->nb->cmsg_lock)); add_control_msg(cm, 0); } @@ -1727,7 +1724,7 @@ static int try_update_ackconn_seqno(struct conn *src_in_l) { int rc = 1; - mutex_lock(&(src_in_l->source.in.nb->cmsg_lock)); + spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock)); if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) { struct control_msg_out *cm = container_of( @@ -1756,7 +1753,7 @@ static int try_update_ackconn_seqno(struct conn *src_in_l) rc = 0; } - mutex_unlock(&(src_in_l->source.in.nb->cmsg_lock)); + spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock)); return rc; } @@ -1822,7 +1819,7 @@ static int try_add_decaytime(struct conn *trgt_out_l, __u16 decaytime) int rc = 1; struct conn *src_in = trgt_out_l->reversedir; - mutex_lock(&(trgt_out_l->target.out.nb->cmsg_lock)); + spin_lock_bh(&(trgt_out_l->target.out.nb->cmsg_lock)); if (list_empty(&(src_in->source.in.acks_pending)) == 0) { struct control_msg_out *cm = container_of( @@ -1847,7 +1844,7 @@ static int try_add_decaytime(struct conn *trgt_out_l, __u16 decaytime) rc = 0; } - mutex_unlock(&(trgt_out_l->target.out.nb->cmsg_lock)); + spin_unlock_bh(&(trgt_out_l->target.out.nb->cmsg_lock)); return rc; } @@ -1890,7 +1887,7 @@ out: void free_ack_conns(struct conn *src_in_l) { int changed = 0; - mutex_lock(&(src_in_l->source.in.nb->cmsg_lock)); + spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock)); while (list_empty(&(src_in_l->source.in.acks_pending)) == 0) { struct list_head *currlh = src_in_l->source.in.acks_pending.next; @@ -1903,7 +1900,7 @@ void free_ack_conns(struct conn *src_in_l) } if (changed) schedule_controlmsg_timer(src_in_l->source.in.nb); - mutex_unlock(&(src_in_l->source.in.nb->cmsg_lock)); + spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock)); } void send_connect_success(struct control_msg_out *cm, __u32 rcvd_conn_id, diff --git a/net/cor/kpacket_parse.c b/net/cor/kpacket_parse.c index b6b27fa90d2..f6247f2e46e 100644 --- a/net/cor/kpacket_parse.c +++ b/net/cor/kpacket_parse.c @@ -145,7 +145,7 @@ static void parse_ack_conn(struct neighbor *nb, struct sk_buff *skb) return; } - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); if (unlikely(is_conn_in(src_in, nb, conn_id) == 0)) { send_connid_unknown(nb, conn_id); @@ -153,7 +153,7 @@ static void parse_ack_conn(struct neighbor *nb, struct sk_buff *skb) set_last_act(src_in); } - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); kref_put(&(src_in->ref), free_conn); } @@ -174,18 +174,18 @@ static void parse_connect(struct neighbor *nb, struct sk_buff *skb) if (unlikely(cm == 0)) goto err; - src_in = alloc_conn(GFP_KERNEL); + src_in = alloc_conn(GFP_ATOMIC); if (unlikely(src_in == 0)) goto err; src_in->is_client = 1; - mutex_lock(&(src_in->rcv_lock)); - mutex_lock(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); if (unlikely(conn_init_out(src_in->reversedir, nb))) { - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); kref_put(&(src_in->reversedir->ref), free_conn); kref_put(&(src_in->ref), free_conn); goto err; @@ -204,8 +204,8 @@ static void parse_connect(struct neighbor *nb, struct sk_buff *skb) src_in->reversedir->target.out.seqno_nextsend + dec_log_64_11(window); insert_reverse_connid(src_in->reversedir); - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); BUG_ON(credits_seqno >= 64); BUG_ON(decaytime >= 1024); @@ -245,8 +245,8 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb) goto err; - mutex_lock(&(src_in->reversedir->rcv_lock)); - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); if (unlikely(is_conn_in(src_in, nb, conn_id1) == 0)) goto err_unlock; @@ -278,8 +278,8 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb) set = 1; } - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); BUG_ON(credits_seqno >= 64); BUG_ON(decaytime >= 1024); @@ -291,8 +291,8 @@ static void parse_conn_success(struct neighbor *nb, struct sk_buff *skb) if (0) { err_unlock: - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); err: send_reset_conn(nb, conn_id2, conn_id1, 0); } @@ -324,11 +324,11 @@ static void parse_reset(struct neighbor *nb, struct sk_buff *skb) return; if (src_in->is_client) { - mutex_lock(&(src_in->rcv_lock)); - mutex_lock(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_lock(&(src_in->reversedir->rcv_lock)); - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); } send = unlikely(is_conn_in(src_in, nb, conn_id)); @@ -336,11 +336,11 @@ static void parse_reset(struct neighbor *nb, struct sk_buff *skb) src_in->reversedir->isreset = 1; if (src_in->is_client) { - mutex_unlock(&(src_in->rcv_lock)); - mutex_unlock(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); } if (send) @@ -356,7 +356,7 @@ static void parse_connid_unknown(struct neighbor *nb, struct sk_buff *skb) if (trgt_out == 0) return; - mutex_lock(&(trgt_out->rcv_lock)); + spin_lock_bh(&(trgt_out->rcv_lock)); if (unlikely(trgt_out->targettype != TARGET_OUT)) goto err; @@ -368,13 +368,13 @@ static void parse_connid_unknown(struct neighbor *nb, struct sk_buff *skb) if (trgt_out->isreset == 0) trgt_out->isreset = 1; - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); reset_conn(trgt_out); if (0) { err: - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); } kref_put(&(trgt_out->ref), free_conn); } @@ -461,10 +461,11 @@ static void kernel_packet2(struct neighbor *nb, struct sk_buff *skb, void kernel_packet(struct neighbor *nb, struct sk_buff *skb, __u32 seqno) { - struct sk_buff *skb2 = skb_clone(skb, __GFP_DMA | GFP_KERNEL); + unsigned char *data = skb->data; + unsigned int len = skb->len; while (1) { - __u8 *codeptr = cor_pull_skb(skb2, 1); + __u8 *codeptr = cor_pull_skb(skb, 1); __u8 code; __u8 *flags; @@ -479,63 +480,63 @@ void kernel_packet(struct neighbor *nb, struct sk_buff *skb, __u32 seqno) case KP_PADDING: break; case KP_PING: - if (cor_pull_skb(skb2, 4) == 0) + if (cor_pull_skb(skb, 4) == 0) goto discard; break; case KP_PONG: - if (cor_pull_skb(skb2, 8) == 0) + if (cor_pull_skb(skb, 8) == 0) goto discard; break; case KP_ACK: - if (cor_pull_skb(skb2, 4) == 0) + if (cor_pull_skb(skb, 4) == 0) goto discard; break; case KP_ACK_CONN: - if (cor_pull_skb(skb2, 4) == 0) + if (cor_pull_skb(skb, 4) == 0) goto discard; - flags = cor_pull_skb(skb2, 1); + flags = cor_pull_skb(skb, 1); if (flags == 0) goto discard; - if (cor_pull_skb(skb2, ack_conn_len(*flags)) == 0) + if (cor_pull_skb(skb, ack_conn_len(*flags)) == 0) goto discard; break; case KP_CONNECT: - if (cor_pull_skb(skb2, 11) == 0) + if (cor_pull_skb(skb, 11) == 0) goto discard; break; case KP_CONNECT_SUCCESS: - if (cor_pull_skb(skb2, 15) == 0) + if (cor_pull_skb(skb, 15) == 0) goto discard; break; case KP_CONN_DATA: - if (cor_pull_skb(skb2, 8) == 0) + if (cor_pull_skb(skb, 8) == 0) goto discard; - lengthptr = cor_pull_skb(skb2, 2); + lengthptr = cor_pull_skb(skb, 2); if (lengthptr == 0) goto discard; length = ntohs(*((__u16 *)lengthptr)); - if (cor_pull_skb(skb2, length) == 0) + if (cor_pull_skb(skb, length) == 0) goto discard; break; case KP_RESET_CONN: case KP_CONNID_UNKNOWN: - if (cor_pull_skb(skb2, 4) == 0) + if (cor_pull_skb(skb, 4) == 0) goto discard; break; case KP_SET_MAX_CMSG_DELAY: - if (cor_pull_skb(skb2, 4) == 0) + if (cor_pull_skb(skb, 4) == 0) goto discard; break; default: goto discard; } } - kfree_skb(skb2); + + skb->data = data; + skb->len = len; + kernel_packet2(nb, skb, seqno); - kfree_skb(skb); - return; discard: - kfree_skb(skb2); kfree_skb(skb); } diff --git a/net/cor/neighbor.c b/net/cor/neighbor.c index 38d3a70ad0b..28e36a3e036 100644 --- a/net/cor/neighbor.c +++ b/net/cor/neighbor.c @@ -61,10 +61,11 @@ * addr [addrlen] */ +static atomic_t packets_in_workqueue = ATOMIC_INIT(0); DEFINE_MUTEX(announce_rcv_lock); DEFINE_SPINLOCK(announce_snd_lock); -DEFINE_MUTEX(neighbor_list_lock); +DEFINE_SPINLOCK(neighbor_list_lock); char *addrtype = "id"; char *addr; @@ -124,16 +125,15 @@ static struct neighbor *alloc_neighbor(gfp_t allocflags) init_timer(&(nb->cmsg_timer)); nb->cmsg_timer.function = controlmsg_timerfunc; nb->cmsg_timer.data = (unsigned long) nb; - INIT_WORK(&(nb->cmsg_work), controlmsg_workfunc); - atomic_set(&(nb->cmsg_work_scheduled), 0); + tasklet_init(&(nb->cmsg_task), controlmsg_taskfunc, (unsigned long) nb); + atomic_set(&(nb->cmsg_task_scheduled), 0); atomic_set(&(nb->cmsg_timer_running), 0); - mutex_init(&(nb->cmsg_lock)); - mutex_init(&(nb->send_cmsg_lock)); + spin_lock_init(&(nb->cmsg_lock)); + spin_lock_init(&(nb->send_cmsg_lock)); INIT_LIST_HEAD(&(nb->control_msgs_out)); INIT_LIST_HEAD(&(nb->ucontrol_msgs_out)); nb->last_ping_time = jiffies; nb->cmsg_interval = 1000000; - atomic_set(&(nb->ooo_packets), 0); spin_lock_init(&(nb->credits_lock)); nb->ktime_credit_update = ktime_get(); nb->ktime_credit_decay = nb->ktime_credit_decay; @@ -183,7 +183,7 @@ struct neighbor *get_neigh_by_mac(struct sk_buff *skb) skb->dev->header_ops->parse != 0) skb->dev->header_ops->parse(skb, source_hw); - mutex_lock(&(neighbor_list_lock)); + spin_lock_bh(&(neighbor_list_lock)); currlh = nb_list.next; @@ -201,7 +201,7 @@ struct neighbor *get_neigh_by_mac(struct sk_buff *skb) currlh = currlh->next; } - mutex_unlock(&(neighbor_list_lock)); + spin_unlock_bh(&(neighbor_list_lock)); return ret; } @@ -222,7 +222,7 @@ struct neighbor *find_neigh(__u16 addrtypelen, __u8 *addrtype, if (get_addrtype(addrtypelen, addrtype) != ADDRTYPE_ID) return 0; - mutex_lock(&(neighbor_list_lock)); + spin_lock_bh(&(neighbor_list_lock)); currlh = nb_list.next; @@ -242,7 +242,7 @@ struct neighbor *find_neigh(__u16 addrtypelen, __u8 *addrtype, } out: - mutex_unlock(&(neighbor_list_lock)); + spin_unlock_bh(&(neighbor_list_lock)); return ret; } @@ -304,7 +304,7 @@ __u32 generate_neigh_list(char *buf, __u32 buflen, __u32 limit, __u32 offset) BUG_ON(rc <= 0); buf_offset += rc; - mutex_lock(&(neighbor_list_lock)); + spin_lock_bh(&(neighbor_list_lock)); currlh = nb_list.next; @@ -382,7 +382,7 @@ cont2: currlh = currlh->next; } - mutex_unlock(&(neighbor_list_lock)); + spin_unlock_bh(&(neighbor_list_lock)); rc = encode_len(buf, 4, total); BUG_ON(rc <= 0); @@ -510,11 +510,11 @@ static void reset_all_conns(struct neighbor *nb) spin_unlock_irqrestore(&(nb->conn_list_lock), iflags); if (src_in->is_client) { - mutex_lock(&(src_in->rcv_lock)); - mutex_lock(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_lock(&(src_in->reversedir->rcv_lock)); - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->reversedir->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); } if (unlikely(unlikely(src_in->sourcetype != SOURCE_IN) || @@ -534,11 +534,11 @@ static void reset_all_conns(struct neighbor *nb) unlock: if (src_in->is_client) { - mutex_unlock(&(src_in->rcv_lock)); - mutex_unlock(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); } else { - mutex_unlock(&(src_in->reversedir->rcv_lock)); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->reversedir->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); } if (rc == 0) { @@ -571,10 +571,10 @@ static void reset_neighbor(struct neighbor *nb) reset_all_conns(nb); if (removenblist) { - mutex_lock(&neighbor_list_lock); + spin_lock_bh(&neighbor_list_lock); list_del(&(nb->nb_list)); refresh_initial_debitsrate(); - mutex_unlock(&neighbor_list_lock); + spin_unlock_bh(&neighbor_list_lock); #warning todo empty control_msg list @@ -587,7 +587,7 @@ static void reset_neighbor_dev(struct net_device *dev) struct list_head *currlh; restart: - mutex_lock(&neighbor_list_lock); + spin_lock_bh(&neighbor_list_lock); currlh = nb_list.next; @@ -605,7 +605,7 @@ restart: spin_unlock_irqrestore(&(currnb->state_lock), iflags); if (state != NEIGHBOR_STATE_KILLED) { - mutex_unlock(&neighbor_list_lock); + spin_unlock_bh(&neighbor_list_lock); reset_neighbor(currnb); goto restart; } @@ -614,7 +614,7 @@ cont: currlh = currlh->next; } - mutex_unlock(&neighbor_list_lock); + spin_unlock_bh(&neighbor_list_lock); } static void stall_timer(struct work_struct *work) @@ -982,7 +982,7 @@ static void add_neighbor(struct neighbor *nb) { struct list_head *currlh; - mutex_lock(&neighbor_list_lock); + spin_lock_bh(&neighbor_list_lock); currlh = nb_list.next; @@ -1002,14 +1002,23 @@ static void add_neighbor(struct neighbor *nb) /* kref_get not needed here, because the caller leaves its ref to us */ printk(KERN_ERR "add_neigh"); - INIT_DELAYED_WORK(&(nb->retrans_timer), retransmit_timerfunc); - INIT_DELAYED_WORK(&(nb->retrans_timer_conn), retransmit_conn_timerfunc); + init_timer(&(nb->retrans_timer)); + nb->retrans_timer.function = retransmit_timerfunc; + nb->retrans_timer.data = (unsigned long) nb; + tasklet_init(&(nb->retrans_task), retransmit_conn_taskfunc, + (unsigned long) nb); - mutex_lock(&(nb->cmsg_lock)); + init_timer(&(nb->retrans_timer_conn)); + nb->retrans_timer_conn.function = retransmit_conn_timerfunc; + nb->retrans_timer_conn.data = (unsigned long) nb; + tasklet_init(&(nb->retrans_task_conn), retransmit_conn_taskfunc, + (unsigned long) nb); + + spin_lock_bh(&(nb->cmsg_lock)); nb->last_ping_time = jiffies; nb->cmsg_interval = 1000000; schedule_controlmsg_timer(nb); - mutex_unlock(&(nb->cmsg_lock)); + spin_unlock_bh(&(nb->cmsg_lock)); list_add_tail(&(nb->nb_list), &nb_list); @@ -1020,7 +1029,7 @@ already_present: kmem_cache_free(nb_slab, nb); } - mutex_unlock(&neighbor_list_lock); + spin_unlock_bh(&neighbor_list_lock); } static __u32 pull_u32(struct sk_buff *skb, int convbo) @@ -1249,14 +1258,14 @@ static void merge_announce(struct announce_in *ann) ps = skb_pstate(skb); currcpy = skb->len; - if (unlikely(ps->funcstate.announce.offset > copy)) { + if (unlikely(ps->funcstate.announce2.offset > copy)) { printk(KERN_ERR "net/cor/neighbor.c: invalid offset" "value found\n"); goto free; } - if (unlikely(ps->funcstate.announce.offset < copy)) { - offset = copy - ps->funcstate.announce.offset; + if (unlikely(ps->funcstate.announce2.offset < copy)) { + offset = copy - ps->funcstate.announce2.offset; currcpy -= offset; } @@ -1279,11 +1288,11 @@ free: kmem_cache_free(announce_in_slab, ann); } -static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) +static int __rcv_announce(struct sk_buff *skb, struct announce_in *ann) { struct skb_procstate *ps = skb_pstate(skb); - __u32 offset = ps->funcstate.announce.offset; + __u32 offset = ps->funcstate.announce2.offset; __u32 len = skb->len; __u32 curroffset = 0; @@ -1310,7 +1319,7 @@ static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) while ((void *) curr != (void *) &(ann->skbs)) { struct skb_procstate *currps = skb_pstate(skb); - curroffset = currps->funcstate.announce.offset; + curroffset = currps->funcstate.announce2.offset; if (curroffset > offset && (prevoffset + prevlen) < curroffset) break; @@ -1352,9 +1361,12 @@ static int _rcv_announce(struct sk_buff *skb, struct announce_in *ann) return 0; } -void rcv_announce(struct sk_buff *skb) +static void _rcv_announce(struct work_struct *work) { - struct skb_procstate *ps = skb_pstate(skb); + struct skb_procstate *ps = container_of(work, + struct skb_procstate, funcstate.announce1.work); + struct sk_buff *skb = skb_from_pstate(ps); + struct announce_in *curr = 0; struct announce_in *leastactive = 0; __u32 list_size = 0; @@ -1369,7 +1381,7 @@ void rcv_announce(struct sk_buff *skb) skb->dev->header_ops->parse != 0) skb->dev->header_ops->parse(skb, source_hw); - ps->funcstate.announce.offset = pull_u32(skb, 1); + ps->funcstate.announce2.offset = pull_u32(skb, 1); if (total_size > 8192) goto discard; @@ -1411,8 +1423,7 @@ void rcv_announce(struct sk_buff *skb) dev_put(curr->dev); } else { - curr = kmem_cache_alloc(announce_in_slab, - GFP_KERNEL); + curr = kmem_cache_alloc(announce_in_slab, GFP_KERNEL); if (curr == 0) goto discard; @@ -1429,7 +1440,7 @@ void rcv_announce(struct sk_buff *skb) memcpy(curr->source_hw, source_hw, MAX_ADDR_LEN); found: - if (_rcv_announce(skb, curr)) { + if (__rcv_announce(skb, curr)) { list_del((struct list_head *) curr); dev_put(curr->dev); kmem_cache_free(announce_in_slab, curr); @@ -1441,6 +1452,28 @@ void rcv_announce(struct sk_buff *skb) } mutex_unlock(&(announce_rcv_lock)); + + atomic_dec(&packets_in_workqueue); +} + +int rcv_announce(struct sk_buff *skb) +{ + struct skb_procstate *ps = skb_pstate(skb); + long queuelen; + + queuelen = atomic_inc_return(&packets_in_workqueue); + + BUG_ON(queuelen <= 0); + + if (queuelen > MAX_PACKETS_IN_RCVQUEUE) { + atomic_dec(&packets_in_workqueue); + kfree_skb(skb); + return NET_RX_SUCCESS; + } + + INIT_WORK(&(ps->funcstate.announce1.work), _rcv_announce); + schedule_work(&(ps->funcstate.announce1.work)); + return NET_RX_SUCCESS; } struct announce{ diff --git a/net/cor/rcv.c b/net/cor/rcv.c index ac1fa865e14..195fbd30e2c 100644 --- a/net/cor/rcv.c +++ b/net/cor/rcv.c @@ -27,16 +27,12 @@ #include "cor.h" -static atomic_t packets_in_workqueue = ATOMIC_INIT(0); - -static atomic_t ooo_packets = ATOMIC_INIT(0); - -static struct workqueue_struct *packet_wq; - static struct work_struct outofbufferspace_work; -DEFINE_SPINLOCK(oobss_lock); +DEFINE_SPINLOCK(oobs_sched_lock); static int outofbufferspace_scheduled; +DEFINE_MUTEX(oobs_lock); + /** * buffering space is divided in 4 areas: * @@ -65,17 +61,16 @@ static int outofbufferspace_scheduled; * also used up, connections will be reset. */ -DEFINE_MUTEX(buffer_conn_list_lock); -LIST_HEAD(buffer_conn_list); +DEFINE_SPINLOCK(bufferlimits_lock); +int oobs_listlock; /** - * used to buffer inserts when main list is locked, moved to main list, after - * processing of main list finishes + * If oobs_listlock, buffer_conn_list must not be touched by anything + * execept outofbufferspace. In this time new conns are inserted to + * buffer_conn_tmp_list and later moved to buffer_conn_list. */ -LIST_HEAD(buffer_conn_tmp_list); /* protected by bufferlimits_lock */ - - -DEFINE_MUTEX(bufferlimits_lock); +LIST_HEAD(buffer_conn_list); +LIST_HEAD(buffer_conn_tmp_list); static __u64 bufferassigned_init; static __u64 bufferassigned_speed; @@ -241,7 +236,10 @@ static void _outofbufferspace(void) memset(&offendingconns, 0, sizeof(offendingconns)); - mutex_lock(&buffer_conn_list_lock); + spin_lock_bh(&bufferlimits_lock); + BUG_ON(oobs_listlock == 1); + oobs_listlock = 1; + spin_unlock_bh(&bufferlimits_lock); curr = buffer_conn_list.next; while (curr != &buffer_conn_list) { @@ -257,7 +255,7 @@ static void _outofbufferspace(void) curr = curr->next; - mutex_lock(&(src_in_o->rcv_lock)); + spin_lock_bh(&(src_in_o->rcv_lock)); BUG_ON(src_in_o->sourcetype != SOURCE_IN); @@ -267,7 +265,7 @@ static void _outofbufferspace(void) speed = get_speed(&(src_in_o->source.in.st), jiffies); spin_unlock_irqrestore(&st_lock, iflags); - mutex_unlock(&(src_in_o->rcv_lock)); + spin_unlock_bh(&(src_in_o->rcv_lock)); if (offendingconns[OOBS_SIZE-1] != 0 && compare_scores( @@ -311,7 +309,10 @@ static void _outofbufferspace(void) kref_get(&(offendingconns[i]->ref)); } - mutex_unlock(&buffer_conn_list_lock); + spin_lock_bh(&bufferlimits_lock); + BUG_ON(oobs_listlock == 0); + oobs_listlock = 0; + spin_unlock_bh(&bufferlimits_lock); for (i=0;i BUFFERSPACE_RESERVE); - mutex_unlock(&bufferlimits_lock); + spin_unlock_bh(&bufferlimits_lock); if (resetneeded) reset_conn(offendingconns[i]); kref_put(&(offendingconns[i]->ref), free_conn); } - mutex_lock(&bufferlimits_lock); - mutex_lock(&buffer_conn_list_lock); - while(list_empty(&buffer_conn_tmp_list) == 0) { + spin_lock_bh(&bufferlimits_lock); + BUG_ON(oobs_listlock == 1); + for (i=0;i<1000 && list_empty(&buffer_conn_tmp_list) == 0;i++) { curr = buffer_conn_tmp_list.next; list_del(curr); list_add(curr, &buffer_conn_list); } - mutex_unlock(&buffer_conn_list_lock); - mutex_unlock(&bufferlimits_lock); + spin_unlock_bh(&bufferlimits_lock); } static void outofbufferspace(struct work_struct *work) { + mutex_lock(&oobs_lock); while (1) { unsigned long iflags; int resetneeded; - mutex_lock(&bufferlimits_lock); - spin_lock_irqsave(&oobss_lock, iflags); + spin_lock_bh(&bufferlimits_lock); + spin_lock_irqsave(&oobs_sched_lock, iflags); resetneeded = (bufferusage_reserve > BUFFERSPACE_RESERVE); if (resetneeded == 0) outofbufferspace_scheduled = 0; - spin_unlock_irqrestore(&oobss_lock, iflags); - mutex_unlock(&bufferlimits_lock); + spin_unlock_irqrestore(&oobs_sched_lock, iflags); + spin_unlock_bh(&bufferlimits_lock); if (resetneeded == 0) - return; + break; _outofbufferspace(); } + mutex_unlock(&oobs_lock); } static void refresh_bufferusage(struct conn *src_in_l) @@ -418,13 +420,13 @@ static void refresh_bufferusage(struct conn *src_in_l) if (bufferusage_reserve > BUFFERSPACE_RESERVE) { unsigned long iflags; - spin_lock_irqsave(&oobss_lock, iflags); + spin_lock_irqsave(&oobs_sched_lock, iflags); if (outofbufferspace_scheduled == 0) { schedule_work(&outofbufferspace_work); outofbufferspace_scheduled = 1; } - spin_unlock_irqrestore(&oobss_lock, iflags); + spin_unlock_irqrestore(&oobs_sched_lock, iflags); } } @@ -459,7 +461,7 @@ static __u8 __get_window(struct conn *src_in_l) #warning todo upper buffer limits static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender, - __u32 expected_connid, int from_acksend, int listlocked) + __u32 expected_connid, int from_acksend) { unsigned long iflags; @@ -475,7 +477,7 @@ static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender, __u64 bufferlimit_speed; __u64 connlimit_speed; - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&(cn->rcv_lock)); BUG_ON(expectedsender == 0 && cn->sourcetype != SOURCE_IN); @@ -486,7 +488,8 @@ static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender, goto out; if (unlikely(cn->isreset != 0)) { - if (listlocked && (cn->source.in.buffer_list.next != 0 || + if (oobs_listlock == 0 && ( + cn->source.in.buffer_list.next != 0 || cn->source.in.buffer_list.prev != 0)) { list_del(&(cn->source.in.buffer_list)); cn->source.in.buffer_list.next = 0; @@ -496,7 +499,7 @@ static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender, goto out; } - if (listlocked){ + if (oobs_listlock == 0){ if (cn->source.in.buffer_list.next != 0 || cn->source.in.buffer_list.prev != 0) { list_del(&(cn->source.in.buffer_list)); @@ -558,7 +561,7 @@ static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender, send_ack_conn_ifneeded(cn); out: - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); return window; } @@ -567,17 +570,14 @@ __u8 get_window(struct conn *cn, struct neighbor *expectedsender, __u32 expected_connid, int from_acksend) { struct conn *cn2; - int listlocked; __u8 window; - mutex_lock(&bufferlimits_lock); - listlocked = mutex_trylock(&buffer_conn_list_lock); + spin_lock_bh(&bufferlimits_lock); - window = _get_window(cn, expectedsender, expected_connid, from_acksend, - listlocked); + window = _get_window(cn, expectedsender, expected_connid, from_acksend); - if (listlocked) { + if (oobs_listlock == 0) { /** * refresh window of idle conns as well to keep global counters * accurate @@ -587,31 +587,26 @@ __u8 get_window(struct conn *cn, struct neighbor *expectedsender, source.in.buffer_list); if (list_empty(&buffer_conn_list) == 0 && cn2 != cn) - _get_window(cn2, 0, 0, 0, listlocked); + _get_window(cn2, 0, 0, 0); if (list_empty(&buffer_conn_tmp_list) == 0) { cn2 = container_of(buffer_conn_tmp_list.next, struct conn, source.in.buffer_list); BUG_ON(cn2 == cn); - _get_window(cn2, 0, 0, 0, listlocked); + _get_window(cn2, 0, 0, 0); } - - mutex_unlock(&buffer_conn_list_lock); } - mutex_unlock(&bufferlimits_lock); + spin_unlock_bh(&bufferlimits_lock); return window; } void reset_bufferusage(struct conn *cn) { - int listlocked; - - mutex_lock(&bufferlimits_lock); - listlocked = mutex_trylock(&buffer_conn_list_lock); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&bufferlimits_lock); + spin_lock_bh(&(cn->rcv_lock)); if (cn->sourcetype != SOURCE_IN) goto out; @@ -625,7 +620,7 @@ void reset_bufferusage(struct conn *cn) bufferassigned_speed -= cn->source.in.buffer_speed; bufferassigned_ata -= cn->source.in.buffer_ata; - if (listlocked && (cn->source.in.buffer_list.next != 0 || + if (oobs_listlock == 0 && (cn->source.in.buffer_list.next != 0 || cn->source.in.buffer_list.prev != 0)) { list_del(&(cn->source.in.buffer_list)); cn->source.in.buffer_list.next = 0; @@ -634,10 +629,8 @@ void reset_bufferusage(struct conn *cn) } out: - mutex_unlock(&(cn->rcv_lock)); - if (listlocked) - mutex_unlock(&buffer_conn_list_lock); - mutex_unlock(&bufferlimits_lock); + spin_unlock_bh(&(cn->rcv_lock)); + spin_unlock_bh(&bufferlimits_lock); } void refresh_speedstat(struct conn *src_in_l, __u32 written) @@ -678,7 +671,7 @@ void reset_ooo_queue(struct conn *src_in_l) struct skb_procstate *ps = skb_pstate(skb); int drop; - if (src_in_l->source.in.next_seqno != ps->funcstate.rcv2.seqno) + if (src_in_l->source.in.next_seqno != ps->funcstate.rcv.seqno) break; drop = receive_skb(src_in_l, skb); @@ -686,10 +679,6 @@ void reset_ooo_queue(struct conn *src_in_l) break; skb_unlink(skb, &(src_in_l->source.in.reorder_queue)); - src_in_l->source.in.ooo_packets--; - atomic_dec(&(src_in_l->source.in.nb->ooo_packets)); - atomic_dec(&ooo_packets); - src_in_l->source.in.next_seqno += skb->len; } } @@ -703,9 +692,6 @@ void drain_ooo_queue(struct conn *src_in_l) struct sk_buff *skb = src_in_l->source.in.reorder_queue.next; skb_unlink(skb, &(src_in_l->source.in.reorder_queue)); kfree_skb(skb); - src_in_l->source.in.ooo_packets--; - atomic_dec(&(src_in_l->source.in.nb->ooo_packets)); - atomic_dec(&ooo_packets); } } @@ -716,22 +702,7 @@ static int _conn_rcv_ooo(struct conn *src_in_l, struct sk_buff *skb) &(src_in_l->source.in.reorder_queue); struct sk_buff *curr = reorder_queue->next; - long ooo; - - #warning todo limit amount of data, not packet count - src_in_l->source.in.ooo_packets++; - if (src_in_l->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN) - goto drop_ooo3; - - ooo = atomic_inc_return(&(src_in_l->source.in.nb->ooo_packets)); - if (ooo > MAX_TOTAL_OOO_PER_NEIGH) - goto drop_ooo2; - - ooo = atomic_inc_return(&ooo_packets); - if (ooo > MAX_TOTAL_OOO_PACKETS) - goto drop_ooo1; - - + #warning todo min rcv size while (1) { struct skb_procstate *ps2 = skb_pstate(curr); @@ -740,7 +711,7 @@ static int _conn_rcv_ooo(struct conn *src_in_l, struct sk_buff *skb) break; } - if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) { + if (ps->funcstate.rcv.seqno > ps2->funcstate.rcv.seqno) { skb_insert(curr, skb, reorder_queue); break; } @@ -748,17 +719,6 @@ static int _conn_rcv_ooo(struct conn *src_in_l, struct sk_buff *skb) curr = curr->next; } - if (0) { -drop_ooo1: - atomic_dec(&ooo_packets); -drop_ooo2: - atomic_dec(&(src_in_l->source.in.nb->ooo_packets)); -drop_ooo3: - src_in_l->source.in.ooo_packets--; - - return 1; - } - return 0; } @@ -772,7 +732,7 @@ static void _conn_rcv(struct neighbor *nb, struct conn *src_in, __u32 len = skb->len; - mutex_lock(&(src_in->rcv_lock)); + spin_lock_bh(&(src_in->rcv_lock)); if (unlikely(unlikely(src_in->isreset != 0) || unlikely(src_in->sourcetype != SOURCE_IN) || @@ -789,11 +749,11 @@ static void _conn_rcv(struct neighbor *nb, struct conn *src_in, set_last_act(src_in); - if (((__s32) (ps->funcstate.rcv2.seqno + skb->len - + if (((__s32) (ps->funcstate.rcv.seqno + skb->len - src_in->source.in.window_seqnolimit)) > 0) goto out; - in_order = (src_in->source.in.next_seqno == ps->funcstate.rcv2.seqno); + in_order = (src_in->source.in.next_seqno == ps->funcstate.rcv.seqno); if (in_order == 0) { drop = _conn_rcv_ooo(src_in, skb); } else { @@ -810,7 +770,7 @@ static void _conn_rcv(struct neighbor *nb, struct conn *src_in, ACM_PRIORITY_LOW); send_ack_conn_ooo(cm, src_in, src_in->reversedir->target.out.conn_id, - ps->funcstate.rcv2.seqno, len); + ps->funcstate.rcv.seqno, len); } else { drain_ooo_queue(src_in); src_in->source.in.inorder_ack_needed = 1; @@ -819,7 +779,7 @@ static void _conn_rcv(struct neighbor *nb, struct conn *src_in, out: send_ack_conn_ifneeded(src_in); - mutex_unlock(&(src_in->rcv_lock)); + spin_unlock_bh(&(src_in->rcv_lock)); if (unlikely(drop)) { kfree_skb(skb); @@ -836,7 +796,7 @@ static void conn_rcv(struct neighbor *nb, struct sk_buff *skb, __u32 conn_id, memset(ps, 0, sizeof(struct skb_procstate)); - ps->funcstate.rcv2.seqno = seqno; + ps->funcstate.rcv.seqno = seqno; cn_src_in = get_conn(conn_id); @@ -866,7 +826,7 @@ static void conn_rcv(struct neighbor *nb, struct sk_buff *skb, __u32 conn_id, void conn_rcv_buildskb(struct neighbor *nb, char *data, __u32 datalen, __u32 conn_id, __u32 seqno) { - struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL); + struct sk_buff *skb = alloc_skb(datalen, GFP_ATOMIC); char *dst = skb_put(skb, datalen); memcpy(dst, data, datalen); conn_rcv(nb, skb, conn_id, seqno); @@ -919,15 +879,14 @@ drop: } } -static void rcv(struct work_struct *work) +static int rcv(struct sk_buff *skb, struct net_device *dev, + struct packet_type *pt, struct net_device *orig_dev) { - struct sk_buff *skb = skb_from_pstate(container_of(work, - struct skb_procstate, funcstate.rcv.work)); - __u8 packet_type; char *packet_type_p; - atomic_dec(&packets_in_workqueue); + if (skb->pkt_type == PACKET_OTHERHOST) + goto drop; packet_type_p = cor_pull_skb(skb, 1); @@ -936,45 +895,17 @@ static void rcv(struct work_struct *work) packet_type = *packet_type_p; - if (packet_type == PACKET_TYPE_ANNOUNCE) { + if (unlikely(packet_type == PACKET_TYPE_ANNOUNCE)) { rcv_announce(skb); - return; + return NET_RX_SUCCESS; } - if (unlikely(packet_type != PACKET_TYPE_DATA)) - goto drop; - - rcv_data(skb); - - if (0) { -drop: - kfree_skb(skb); + if (likely(packet_type == PACKET_TYPE_DATA)) { + rcv_data(skb); + return NET_RX_SUCCESS; } -} - -static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev, - struct packet_type *pt, struct net_device *orig_dev) -{ - struct skb_procstate *ps = skb_pstate(skb); - long queuelen; - if (skb->pkt_type == PACKET_OTHERHOST) - goto drop; - - BUG_ON(skb->next != 0); - - queuelen = atomic_inc_return(&packets_in_workqueue); - - BUG_ON(queuelen <= 0); - - #warning todo limit per interface, inbound credits - if (queuelen > MAX_PACKETS_IN_RCVQUEUE) { - atomic_dec(&packets_in_workqueue); - goto drop; - } - - INIT_WORK(&(ps->funcstate.rcv.work), rcv); - queue_work(packet_wq, &(ps->funcstate.rcv.work)); + kfree_skb(skb); return NET_RX_SUCCESS; drop: @@ -985,11 +916,13 @@ drop: static struct packet_type ptype_cor = { .type = htons(ETH_P_COR), .dev = 0, - .func = queue_rcv_processing + .func = rcv }; int __init cor_rcv_init(void) { + oobs_listlock = 0; + bufferassigned_init = 0; bufferassigned_speed = 0; bufferassigned_ata = 0; @@ -1002,7 +935,6 @@ int __init cor_rcv_init(void) memset(&st, 0, sizeof(struct speedtracker)); BUG_ON(sizeof(struct skb_procstate) > 48); - packet_wq = create_workqueue("cor_packet"); INIT_WORK(&outofbufferspace_work, outofbufferspace); outofbufferspace_scheduled = 0; diff --git a/net/cor/snd.c b/net/cor/snd.c index da16bd01682..5429ce6c5c4 100644 --- a/net/cor/snd.c +++ b/net/cor/snd.c @@ -46,9 +46,10 @@ static void free_connretrans(struct kref *ref) kref_put(&(cr->trgt_out_o->ref), free_conn); } -DEFINE_MUTEX(queues_lock); +DEFINE_SPINLOCK(queues_lock); LIST_HEAD(queues); -struct delayed_work qos_resume_work; +struct timer_list qos_resume_timer; +struct tasklet_struct qos_resume_task; int qos_resume_scheduled; void free_qos(struct kref *ref) @@ -77,14 +78,14 @@ static int _resume_conns(struct qos_queue *q) refresh_conn_credits(trgt_out_o, 0, 0); - mutex_lock(&(trgt_out_o->rcv_lock)); + spin_lock_bh(&(trgt_out_o->rcv_lock)); BUG_ON(trgt_out_o->targettype != TARGET_OUT); if (unlikely(trgt_out_o->isreset != 0)) { trgt_out_o->target.out.rb.in_queue = 0; list_del(&(trgt_out_o->target.out.rb.lh)); - mutex_unlock(&(trgt_out_o->rcv_lock)); + spin_unlock_bh(&(trgt_out_o->rcv_lock)); kref_put(&(trgt_out_o->ref), free_conn); continue; @@ -101,7 +102,7 @@ static int _resume_conns(struct qos_queue *q) else credits = multiply_div(trgt_out_o->credits, 1LL << 24, trgt_out_o->data_buf.read_remaining); - mutex_unlock(&(trgt_out_o->rcv_lock)); + spin_unlock_bh(&(trgt_out_o->rcv_lock)); if (best == 0 || bestcredit < credits) { secondcredit = bestcredit; @@ -115,14 +116,14 @@ static int _resume_conns(struct qos_queue *q) if (best == 0) return RC_FLUSH_CONN_OUT_OK; - mutex_lock(&(best->rcv_lock)); + spin_lock_bh(&(best->rcv_lock)); rc = flush_out(best, 1, (__u32) (secondcredit >> 32)); if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT) { best->target.out.rb.in_queue = 0; list_del(&(best->target.out.rb.lh)); } - mutex_unlock(&(best->rcv_lock)); + spin_unlock_bh(&(best->rcv_lock)); refresh_conn_credits(best, 0, 0); @@ -270,13 +271,13 @@ static int _qos_resume(struct qos_queue *q) return rc; } -static void qos_resume(struct work_struct *work) +void qos_resume_taskfunc(unsigned long arg) { struct list_head *curr; int congested = 0; - mutex_lock(&(queues_lock)); + spin_lock_bh(&(queues_lock)); curr = queues.next; while (curr != (&queues)) { @@ -290,12 +291,17 @@ static void qos_resume(struct work_struct *work) } if (congested) { - schedule_delayed_work(&(qos_resume_work), 1); + mod_timer(&(qos_resume_timer), jiffies + 1); } else { qos_resume_scheduled = 0; } - mutex_unlock(&(queues_lock)); + spin_unlock_bh(&(queues_lock)); +} + +void qos_resume_timerfunc(unsigned long arg) +{ + tasklet_schedule(&qos_resume_task); } struct qos_queue *get_queue(struct net_device *dev) @@ -303,7 +309,7 @@ struct qos_queue *get_queue(struct net_device *dev) struct qos_queue *ret = 0; struct list_head *curr; - mutex_lock(&(queues_lock)); + spin_lock_bh(&(queues_lock)); curr = queues.next; while (curr != (&queues)) { struct qos_queue *q = container_of(curr, @@ -313,7 +319,7 @@ struct qos_queue *get_queue(struct net_device *dev) break; } } - mutex_unlock(&(queues_lock)); + spin_unlock_bh(&(queues_lock)); return ret; } @@ -371,9 +377,9 @@ int destroy_queue(struct net_device *dev) if (unlink) { dev_put(dev); - mutex_lock(&(queues_lock)); + spin_lock_bh(&(queues_lock)); list_del(&(q->queue_list)); - mutex_unlock(&(queues_lock)); + spin_unlock_bh(&(queues_lock)); kref_put(&(q->ref), free_qos); } kref_put(&(q->ref), free_qos); @@ -401,9 +407,9 @@ int create_queue(struct net_device *dev) INIT_LIST_HEAD(&(q->announce_waiting)); INIT_LIST_HEAD(&(q->conns_waiting)); - mutex_lock(&(queues_lock)); + spin_lock_bh(&(queues_lock)); list_add(&(q->queue_list), &queues); - mutex_unlock(&(queues_lock)); + spin_unlock_bh(&(queues_lock)); return 0; } @@ -436,7 +442,7 @@ void qos_enqueue(struct qos_queue *q, struct resume_block *rb, int caller) } if (qos_resume_scheduled == 0) { - schedule_delayed_work(&(qos_resume_work), 1); + mod_timer(&(qos_resume_timer), jiffies + 1); qos_resume_scheduled = 1; } @@ -548,11 +554,9 @@ static void set_conn_retrans_timeout(struct conn_retrans *cr) static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr, struct neighbor *nb, __u32 length, int *dontsend) { - unsigned long iflags; - struct conn_retrans *ret = 0; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); if (unlikely(cr->ackrcvd)) { *dontsend = 1; @@ -587,17 +591,16 @@ static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr, } out: - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); return ret; } void cancel_retrans(struct conn *trgt_out_l) { - unsigned long iflags; struct neighbor *nb = trgt_out_l->target.out.nb; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) { struct conn_retrans *cr = container_of( @@ -612,7 +615,7 @@ void cancel_retrans(struct conn *trgt_out_l) } #warning reschedule timer - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); } static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) @@ -621,7 +624,7 @@ static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) int dontsend; int queuefull = 0; - mutex_lock(&(cr->trgt_out_o->rcv_lock)); + spin_lock_bh(&(cr->trgt_out_o->rcv_lock)); BUG_ON(cr->trgt_out_o->targettype != TARGET_OUT); BUG_ON(cr->trgt_out_o->target.out.nb != nb); @@ -642,7 +645,7 @@ static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) if (may_send_conn_retrans(nb) == 0) goto qos_enqueue; - skb = create_packet(nb, targetmss, GFP_KERNEL, + skb = create_packet(nb, targetmss, GFP_ATOMIC, cr->trgt_out_o->target.out.conn_id, cr->seqno); if (unlikely(skb == 0)) { cr->timeout = jiffies + 1; @@ -662,9 +665,7 @@ static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) rc = dev_queue_xmit(skb); if (rc != 0) { - unsigned long iflags; - - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); if (unlikely(cr->ackrcvd)) { dontsend = 1; } else { @@ -672,7 +673,7 @@ static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) list_add(&(cr->timeout_list), &(nb->retrans_list_conn)); } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); if (dontsend == 0) goto qos_enqueue; } @@ -687,7 +688,7 @@ static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr) BUG(); } else { struct control_msg_out *cm; - char *buf = kmalloc(cr->length, GFP_KERNEL); + char *buf = kmalloc(cr->length, GFP_ATOMIC); if (unlikely(buf == 0)) { cr->timeout = jiffies + 1; @@ -718,7 +719,7 @@ qos_enqueue: queuefull = 1; } out: - mutex_unlock(&(cr->trgt_out_o->rcv_lock)); + spin_unlock_bh(&(cr->trgt_out_o->rcv_lock)); kref_put(&(cr->trgt_out_o->ref), free_conn); @@ -740,11 +741,11 @@ static int send_retrans(struct neighbor *nb, int fromqos) spin_unlock_irqrestore(&(nb->state_lock), iflags); while (1) { - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); if (list_empty(&(nb->retrans_list_conn))) { nb->retrans_timer_conn_running = 0; - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); break; } @@ -754,7 +755,7 @@ static int send_retrans(struct neighbor *nb, int fromqos) if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) { list_del(&(cr->timeout_list)); list_del(&(cr->conn_list)); - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); kref_put(&(cr->ref), free_connretrans); continue; @@ -763,17 +764,16 @@ static int send_retrans(struct neighbor *nb, int fromqos) #warning todo check window limit if (time_after(cr->timeout, jiffies)) { - schedule_delayed_work(&(nb->retrans_timer_conn), - cr->timeout - jiffies); + mod_timer(&(nb->retrans_timer_conn), cr->timeout); if (fromqos) kref_get(&(nb->ref)); rescheduled = 1; - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); break; } kref_get(&(cr->ref)); - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); queuefull = _send_retrans(nb, cr); kref_put(&(cr->ref), free_connretrans); if (queuefull) { @@ -790,24 +790,27 @@ static int send_retrans(struct neighbor *nb, int fromqos) return queuefull; } -void retransmit_conn_timerfunc(struct work_struct *work) +void retransmit_conn_taskfunc(unsigned long arg) { - struct neighbor *nb = container_of(to_delayed_work(work), - struct neighbor, retrans_timer_conn); - + struct neighbor *nb = (struct neighbor *) arg; send_retrans(nb, 0); } +void retransmit_conn_timerfunc(unsigned long arg) +{ + struct neighbor *nb = (struct neighbor *) arg; + tasklet_schedule(&(nb->retrans_task_conn)); +} + void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out, __u32 seqno_ooo, __u32 length) { - unsigned long iflags; struct list_head *curr; if (unlikely(length == 0)) return; - mutex_lock(&(trgt_out->rcv_lock)); + spin_lock_bh(&(trgt_out->rcv_lock)); if (unlikely(trgt_out->targettype != TARGET_OUT)) goto out; @@ -816,7 +819,7 @@ void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id, if (unlikely(trgt_out->target.out.conn_id != conn_id)) goto out; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); curr = trgt_out->target.out.retrans_list.next; @@ -864,19 +867,18 @@ cont: trgt_out->target.out.seqno_acked = cr->seqno; } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); out: - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); } void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out, __u32 seqno, int setwindow, __u8 window) { int flush = 0; - unsigned long iflags; - mutex_lock(&(trgt_out->rcv_lock)); + spin_lock_bh(&(trgt_out->rcv_lock)); if (unlikely(trgt_out->isreset != 0)) goto out; @@ -907,7 +909,7 @@ skipwindow: if (seqno == trgt_out->target.out.seqno_acked) goto out; - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); trgt_out->target.out.seqno_acked = seqno; @@ -930,11 +932,11 @@ skipwindow: kref_put(&(cr->ref), free_connretrans); } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); databuf_ack(trgt_out, trgt_out->target.out.seqno_acked); out: - mutex_unlock(&(trgt_out->rcv_lock)); + spin_unlock_bh(&(trgt_out->rcv_lock)); if (flush) flush_buf(trgt_out); @@ -943,8 +945,6 @@ out: static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *trgt_out_l, __u32 seqno, __u32 len) { - unsigned long iflags; - struct neighbor *nb = trgt_out_l->target.out.nb; int first; @@ -959,7 +959,7 @@ static void schedule_retransmit_conn(struct conn_retrans *cr, kref_init(&(cr->ref)); set_conn_retrans_timeout(cr); - spin_lock_irqsave(&(nb->retrans_lock), iflags); + spin_lock_bh(&(nb->retrans_lock)); first = unlikely(list_empty(&(nb->retrans_list_conn))); list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn)); @@ -968,13 +968,12 @@ static void schedule_retransmit_conn(struct conn_retrans *cr, if (unlikely(unlikely(first) && unlikely(nb->retrans_timer_conn_running == 0))) { - schedule_delayed_work(&(nb->retrans_timer_conn), - cr->timeout - jiffies); + mod_timer(&(nb->retrans_timer_conn), cr->timeout); nb->retrans_timer_conn_running = 1; kref_get(&(nb->ref)); } - spin_unlock_irqrestore(&(nb->retrans_lock), iflags); + spin_unlock_bh(&(nb->retrans_lock)); } static __u32 get_windowlimit(struct conn *trgt_out_l) @@ -1032,7 +1031,7 @@ int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte) return RC_FLUSH_CONN_OUT_OOM; } - cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL); + cr = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC); if (unlikely(cr == 0)) { kfree_skb(skb); qos_enqueue_conn(trgt_out_l); @@ -1077,7 +1076,7 @@ int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte) if (len > windowlimit) len = windowlimit; - buf = kmalloc(len, GFP_KERNEL); + buf = kmalloc(len, GFP_ATOMIC); if (unlikely(creditsperbyte * len > trgt_out_l->credits)) return RC_FLUSH_CONN_OUT_CREDITS; @@ -1095,7 +1094,7 @@ int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte) return RC_FLUSH_CONN_OUT_OOM; } - cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL); + cr = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC); if (unlikely(cr == 0)) { kfree(buf); free_control_msg(cm); @@ -1131,7 +1130,11 @@ int __init cor_snd_init(void) if (unlikely(connretrans_slab == 0)) return 1; - INIT_DELAYED_WORK(&(qos_resume_work), qos_resume); + init_timer(&qos_resume_timer); + qos_resume_timer.function = qos_resume_timerfunc; + qos_resume_timer.data = 0; + tasklet_init(&qos_resume_task, qos_resume_taskfunc, 0); + qos_resume_scheduled = 0; return 0; diff --git a/net/cor/sock.c b/net/cor/sock.c index 836afac94d9..12d1526f379 100644 --- a/net/cor/sock.c +++ b/net/cor/sock.c @@ -24,11 +24,13 @@ #include "cor.h" +struct kmem_cache *sock_slab; + /** * sock_bt_wait_list and waiting_conns are ordered by min amount first, the * order in which resuming will happen */ -DEFINE_MUTEX(sock_bufferlimits_lock); +DEFINE_SPINLOCK(sock_bufferlimits_lock); LIST_HEAD(sock_bt_list); LIST_HEAD(sock_bt_wait_list); static __u64 sock_bufferusage; @@ -71,7 +73,7 @@ static struct sock_buffertracker *get_sock_buffertracker(uid_t uid) curr = curr->next; } - sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL); + sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_ATOMIC); if (sbt != 0) { memset(sbt, 0, sizeof(struct sock_buffertracker)); sbt->uid = uid; @@ -124,7 +126,7 @@ static int oosbs_resumesbt(struct sock_buffertracker *sbt) source.sock.delflush_list); int flush = 0; - mutex_lock(&(src_in_o->rcv_lock)); + spin_lock_bh(&(src_in_o->rcv_lock)); BUG_ON(src_in_o->sourcetype != SOURCE_SOCK); @@ -136,13 +138,13 @@ static int oosbs_resumesbt(struct sock_buffertracker *sbt) flush = 1; } - mutex_unlock(&(src_in_o->rcv_lock)); + spin_unlock_bh(&(src_in_o->rcv_lock)); if (flush) { if (restart == 0) { restart = 1; kref_get(&(sbt->ref)); - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&sock_bufferlimits_lock); } flush_buf(src_in_o); } @@ -162,7 +164,7 @@ static void oosbs_global(void) if (0) { restart: - mutex_lock(&sock_bufferlimits_lock); + spin_lock_bh(&sock_bufferlimits_lock); } curr = sock_bt_list.prev; @@ -192,7 +194,7 @@ static void oosbs_user(void) if (0) { restart: - mutex_lock(&sock_bufferlimits_lock); + spin_lock_bh(&sock_bufferlimits_lock); } curr = sock_bt_wait_list.prev; @@ -212,7 +214,7 @@ restart: static void outofsockbufferspace(struct work_struct *work) { - mutex_lock(&sock_bufferlimits_lock); + spin_lock_bh(&sock_bufferlimits_lock); if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) { oosbs_user(); if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) @@ -222,7 +224,7 @@ global: oosbs_global(); } outofsockbufferspace_scheduled = 0; - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&sock_bufferlimits_lock); } static void _reserve_sock_buffer_inswl(struct conn *src_in_l) @@ -333,7 +335,7 @@ static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt) while (list_empty(&(sbt->waiting_conns)) && failed == 0) { struct conn *src_in_o = container_of(sbt->waiting_conns.next, struct conn, source.sock.alwait_list); - mutex_lock(&(src_in_o->rcv_lock)); + spin_lock_bh(&(src_in_o->rcv_lock)); BUG_ON(src_in_o->sourcetype == SOURCE_SOCK); BUG_ON(src_in_o->source.sock.in_alwait_list == 0); @@ -352,7 +354,7 @@ static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt) wake_up_interruptible(&(src_in_o->source.sock.wait)); out: - mutex_unlock(&(src_in_o->rcv_lock)); + spin_unlock_bh(&(src_in_o->rcv_lock)); } return failed; @@ -396,8 +398,8 @@ void connreset_sbt(struct conn *cn) { struct sock_buffertracker *sbt; - mutex_lock(&sock_bufferlimits_lock); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&sock_bufferlimits_lock); + spin_lock_bh(&(cn->rcv_lock)); if (cn->sourcetype != SOURCE_SOCK) goto out; @@ -426,8 +428,8 @@ void connreset_sbt(struct conn *cn) cn->source.sock.sbt = 0; out: - mutex_unlock(&(cn->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&(cn->rcv_lock)); + spin_unlock_bh(&sock_bufferlimits_lock); } void unreserve_sock_buffer(struct conn *cn) @@ -435,8 +437,8 @@ void unreserve_sock_buffer(struct conn *cn) int freed = 0; struct sock_buffertracker *sbt; - mutex_lock(&sock_bufferlimits_lock); - mutex_lock(&(cn->rcv_lock)); + spin_lock_bh(&sock_bufferlimits_lock); + spin_lock_bh(&(cn->rcv_lock)); if (cn->sourcetype != SOURCE_SOCK) goto out; @@ -486,75 +488,66 @@ void unreserve_sock_buffer(struct conn *cn) reorder_sock_bt_wait_list(sbt); out: - mutex_unlock(&(cn->rcv_lock)); + spin_unlock_bh(&(cn->rcv_lock)); if (freed) resume_bufferwaiting_socks(); - mutex_unlock(&sock_bufferlimits_lock); -} - - -static int check_connlistener_state(struct connlistener *cl) -{ - if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER)) - return 0; - - return 1; + spin_unlock_bh(&sock_bufferlimits_lock); } -static int check_conn_state(struct conn *cn) -{ - if (likely(cn != 0 && cn->sockstate == SOCKSTATE_CONN)) - return 0; - - return 1; -} int cor_socket_release(struct socket *sock) { - struct connlistener *cl = (struct connlistener *) sock->sk; - struct conn *src_sock_o = (struct conn *) sock->sk; - - if (sock->sk == 0) - return 0; - - if (cl->sockstate == SOCKSTATE_LISTENER) { - close_port(cl); - } else if (src_sock_o->sockstate == SOCKSTATE_CONN) { - reset_conn(src_sock_o); - BUG_ON(src_sock_o->sourcetype != SOURCE_SOCK); - kref_put(&(src_sock_o->ref), free_conn); + struct cor_sock *cs = (struct cor_sock *) sock->sk; + + if (cs->type == SOCKTYPE_UNCONNECTED) { + } else if (cs->type == SOCKTYPE_LISTENER) { + close_port(cs); + } else if (cs->type == SOCKTYPE_CONN) { + reset_conn(cs->data.conn.src_sock); + kref_put(&(cs->data.conn.src_sock->ref), free_conn); + kref_put(&(cs->data.conn.trgt_sock->ref), free_conn); + + if (cs->data.conn.rcvitem != 0) { + databuf_item_free(cs->data.conn.rcvitem); + cs->data.conn.rcvitem = 0; + } } else { BUG(); } + kmem_cache_free(sock_slab, cs); + return 0; } int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr, int sockaddr_len) { - struct connlistener *listener; + int rc = 0; + struct cor_sock *cs = (struct cor_sock *) sock->sk; struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr; - if (unlikely(sock->sk != 0)) + if (unlikely(sockaddr_len < sizeof(struct cor_sockaddr))) return -EINVAL; - if (sockaddr_len < sizeof(struct cor_sockaddr)) + if (unlikely(addr->type != SOCKADDRTYPE_PORT)) return -EINVAL; - if (addr->type != SOCKADDRTYPE_PORT) - return -EINVAL; + spin_lock_bh(&(cs->lock)); - listener = open_port(addr->addr.port); + if (unlikely(cs->type != SOCKTYPE_UNCONNECTED)) { + rc = -EINVAL; + goto out; + } - if (listener == 0) - return -EADDRINUSE; + rc = open_port(cs, addr->addr.port); - sock->sk = (struct sock *) listener; +out: + spin_unlock_bh(&(cs->lock)); - return 0; + return rc; } int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr, @@ -564,8 +557,7 @@ int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr, struct conn *src_sock; - if (unlikely(sock->sk != 0)) - return -EISCONN; + struct cor_sock *cs = (struct cor_sock *) sock->sk; src_sock = alloc_conn(GFP_KERNEL); @@ -574,37 +566,55 @@ int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr, src_sock->is_client = 1; - mutex_lock(&sock_bufferlimits_lock); + spin_lock_bh(&sock_bufferlimits_lock); sbt = get_sock_buffertracker(current_uid()); - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&sock_bufferlimits_lock); if (unlikely(sbt == 0)) { reset_conn(src_sock); return -ENOMEM; } - kref_get(&(src_sock->ref)); + spin_lock_bh(&(src_sock->rcv_lock)); + spin_lock_bh(&(src_sock->reversedir->rcv_lock)); + spin_lock_bh(&(cs->lock)); + if (cs->type != SOCKTYPE_UNCONNECTED) { + spin_unlock_bh(&(cs->lock)); + spin_unlock_bh(&(src_sock->reversedir->rcv_lock)); + spin_unlock_bh(&(src_sock->rcv_lock)); + reset_conn(src_sock); + kref_put(&(sbt->ref), free_sbt); + return -EISCONN; + } - mutex_lock(&(src_sock->rcv_lock)); - mutex_lock(&(src_sock->reversedir->rcv_lock)); conn_init_sock_source(src_sock); src_sock->source.sock.sbt = sbt; conn_init_sock_target(src_sock->reversedir); - mutex_unlock(&(src_sock->reversedir->rcv_lock)); - mutex_unlock(&(src_sock->rcv_lock)); - sock->sk = (struct sock *) src_sock; + memset(&(cs->data), 0, sizeof(cs->data)); + cs->type = SOCKTYPE_CONN; + cs->data.conn.src_sock = src_sock; + cs->data.conn.trgt_sock = src_sock->reversedir; + mutex_init(&(cs->data.conn.rcvbuf_lock)); + kref_get(&(src_sock->ref)); + kref_get(&(src_sock->reversedir->ref)); + + spin_unlock_bh(&(cs->lock)); + spin_unlock_bh(&(src_sock->reversedir->rcv_lock)); + spin_unlock_bh(&(src_sock->rcv_lock)); + sock->state = SS_CONNECTED; return 0; } -static int cor_rdytoaccept(struct connlistener *cl) +static int cor_rdytoaccept(struct cor_sock *cs) { int rc; - mutex_lock(&(cl->lock)); - rc = (list_empty(&(cl->conn_queue)) == 0); - mutex_unlock(&(cl->lock)); + spin_lock_bh(&(cs->lock)); + BUG_ON(cs->type != SOCKTYPE_LISTENER); + rc = (list_empty(&(cs->data.listener.conn_queue)) == 0); + spin_unlock_bh(&(cs->lock)); return rc; } @@ -614,60 +624,87 @@ int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags) { struct sock_buffertracker *sbt; - struct connlistener *cl = (struct connlistener *) sock->sk; - - int rc = check_connlistener_state(cl); + struct cor_sock *cs = (struct cor_sock *) sock->sk; struct conn *src_sock_o; - if (unlikely(rc)) - return -EINVAL; + struct cor_sock *newcs; + + + newcs = kmem_cache_alloc(sock_slab, GFP_KERNEL); + if (unlikely(newcs == 0)) + return -ENOMEM; + memset(newcs, 0, sizeof(struct cor_sock)); + newcs->type = SOCKTYPE_CONN; + spin_lock_init(&(newcs->lock)); - mutex_lock(&sock_bufferlimits_lock); + spin_lock_bh(&sock_bufferlimits_lock); sbt = get_sock_buffertracker(current_uid()); - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&sock_bufferlimits_lock); - if (unlikely(sbt == 0)) + if (unlikely(sbt == 0)) { + kmem_cache_free(sock_slab, newcs); return -ENOMEM; + } restart: - mutex_lock(&(cl->lock)); + spin_lock_bh(&(cs->lock)); - if (unlikely(cl->queue_maxlen <= 0)) { - mutex_unlock(&(cl->lock)); + BUG_ON(cs->type != SOCKTYPE_UNCONNECTED && + cs->type != SOCKTYPE_LISTENER && + cs->type != SOCKTYPE_CONN); + + if (unlikely(cs->type != SOCKTYPE_LISTENER)) { + spin_unlock_bh(&(cs->lock)); + kref_put(&(sbt->ref), free_sbt); + kmem_cache_free(sock_slab, newcs); return -EINVAL; } - while (list_empty(&(cl->conn_queue))) { - mutex_unlock(&(cl->lock)); - if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) { + if (unlikely(cs->data.listener.queue_maxlen <= 0)) { + spin_unlock_bh(&(cs->lock)); + kref_put(&(sbt->ref), free_sbt); + kmem_cache_free(sock_slab, newcs); + return -EINVAL; + } + + while (list_empty(&(cs->data.listener.conn_queue))) { + spin_unlock_bh(&(cs->lock)); + if (wait_event_interruptible(cs->data.listener.wait, + cor_rdytoaccept(cs))) { kref_put(&(sbt->ref), free_sbt); + kmem_cache_free(sock_slab, newcs); return -ERESTARTSYS; } - mutex_lock(&(cl->lock)); + spin_lock_bh(&(cs->lock)); } - src_sock_o = container_of(cl->conn_queue.next, struct conn, - source.sock.cl_list); + src_sock_o = container_of(cs->data.listener.conn_queue.next, + struct conn, source.sock.cl_list); BUG_ON(src_sock_o->sourcetype != SOURCE_SOCK); - list_del(cl->conn_queue.next); + list_del(cs->data.listener.conn_queue.next); - cl->queue_len--; + cs->data.listener.queue_len--; - mutex_unlock(&(cl->lock)); + spin_unlock_bh(&(cs->lock)); - mutex_lock(&(src_sock_o->rcv_lock)); - src_sock_o->source.sock.sbt = sbt; + spin_lock_bh(&(src_sock_o->rcv_lock)); if (unlikely(src_sock_o->isreset != 0)) { - mutex_unlock(&(src_sock_o->rcv_lock)); + spin_unlock_bh(&(src_sock_o->rcv_lock)); kref_put(&(src_sock_o->ref), free_conn); goto restart; } + src_sock_o->source.sock.sbt = sbt; + spin_unlock_bh(&(src_sock_o->rcv_lock)); + + newcs->data.conn.src_sock = src_sock_o; + newcs->data.conn.trgt_sock = src_sock_o->reversedir; + kref_get(&(src_sock_o->reversedir->ref)); newsock->ops = &cor_proto_ops; - newsock->sk = (struct sock *) src_sock_o; + newsock->sk = (struct sock *) newcs; newsock->state = SS_CONNECTED; return 0; @@ -675,16 +712,22 @@ restart: int cor_socket_listen(struct socket *sock, int len) { - struct connlistener *cl = (struct connlistener *) sock->sk; + struct cor_sock *cs = (struct cor_sock *) sock->sk; + + spin_lock_bh(&(cs->lock)); - int rc = check_connlistener_state(cl); + BUG_ON(cs->type != SOCKTYPE_UNCONNECTED && + cs->type != SOCKTYPE_LISTENER && + cs->type != SOCKTYPE_CONN); - if (unlikely(rc)) + if (unlikely(cs->type != SOCKTYPE_LISTENER)) { + spin_unlock_bh(&(cs->lock)); return -EOPNOTSUPP; + } - mutex_lock(&(cl->lock)); - cl->queue_maxlen = len; - mutex_unlock(&(cl->lock)); + cs->data.listener.queue_maxlen = len; + + spin_unlock_bh(&(cs->lock)); return 0; } @@ -699,200 +742,410 @@ int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) return -ENOIOCTLCMD; } -static int sendmsg_maypush(struct conn *src_sock_o) +static int sendmsg_maypush(struct cor_sock *cs, struct conn *src_sock) { int ret = 0; - mutex_lock(&sock_bufferlimits_lock); - mutex_lock(&(src_sock_o->rcv_lock)); - if (unlikely(src_sock_o->isreset != 0)) { + + spin_lock_bh(&sock_bufferlimits_lock); + spin_lock_bh(&(src_sock->rcv_lock)); + spin_lock_bh(&(cs->lock)); + + if (unlikely(unlikely(src_sock != cs->data.conn.src_sock))) { + ret = 1; + } else if (unlikely(src_sock->isreset != 0)) { ret = 1; - } else if (src_sock_o->source.sock.wait_len == 0) { + } else if (src_sock->source.sock.wait_len == 0) { ret = 1; - } else if (src_sock_o->source.sock.alloclimit + - src_sock_o->data_buf.cpacket_buffer > - src_sock_o->data_buf.totalsize + - src_sock_o->data_buf.overhead) { + } else if (src_sock->source.sock.alloclimit + + src_sock->data_buf.cpacket_buffer > + src_sock->data_buf.totalsize + + src_sock->data_buf.overhead) { ret = 1; } else { - reserve_sock_buffer(src_sock_o, - src_sock_o->source.sock.wait_len); - if (src_sock_o->source.sock.alloclimit + - src_sock_o->data_buf.cpacket_buffer > - src_sock_o->data_buf.totalsize + - src_sock_o->data_buf.overhead) + reserve_sock_buffer(src_sock, + src_sock->source.sock.wait_len); + if (src_sock->source.sock.alloclimit + + src_sock->data_buf.cpacket_buffer > + src_sock->data_buf.totalsize + + src_sock->data_buf.overhead) ret = 1; } - mutex_unlock(&(src_sock_o->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); + + spin_unlock_bh(&(cs->lock)); + spin_unlock_bh(&(src_sock->rcv_lock)); + spin_unlock_bh(&sock_bufferlimits_lock); + return ret; } -int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, - size_t total_len) +__s32 ___cor_sendmsg(char *buf, __u32 bufread, __u32 buflen, + __u32 totalremaining, struct cor_sock *cs, + struct conn *src_sock, struct conn *trgt_sock) { - __s64 copied = 0; + __s32 rc = 0; + __s64 bufferfree; + + spin_lock_bh(&sock_bufferlimits_lock); + spin_lock_bh(&(src_sock->rcv_lock)); + spin_lock_bh(&(cs->lock)); + + if (unlikely(unlikely(src_sock != cs->data.conn.src_sock) || + unlikely(trgt_sock != cs->data.conn.trgt_sock) || + unlikely(src_sock->isreset != 0))) { + spin_unlock_bh(&(cs->lock)); + spin_unlock_bh(&(src_sock->rcv_lock)); + spin_unlock_bh(&sock_bufferlimits_lock); + return -EPIPE; + } - struct conn *src_sock_o = (struct conn *) sock->sk; + spin_unlock_bh(&(cs->lock)); - int rc = check_conn_state(src_sock_o); + reserve_sock_buffer(src_sock, (__u32) (buflen + + sizeof(struct data_buf_item))); - int flush = (msg->msg_flags & MSG_MORE) == 0; - int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0; + bufferfree = (__s64) src_sock->source.sock.alloclimit + + (__s64) src_sock->data_buf.cpacket_buffer - + (__s64) src_sock->data_buf.totalsize - + (__s64) src_sock->data_buf.overhead; - __s32 bufferfree; - __u64 max = (1LL << 32) - 1; - __u32 totallen = (total_len > max ? max : total_len); + spin_unlock_bh(&sock_bufferlimits_lock); - if (unlikely(rc)) - return -EBADF; + if (bufferfree < (buflen + sizeof(struct data_buf_item))) { + kfree(buf); + rc = -EAGAIN; + printk(KERN_ERR "2"); + goto out; + } -recv: - mutex_lock(&sock_bufferlimits_lock); - mutex_lock(&(src_sock_o->rcv_lock)); + rc = receive_buf(src_sock, buf, bufread, buflen, 0); - if (unlikely(src_sock_o->isreset != 0)) { - mutex_lock(&(src_sock_o->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); - return -EPIPE; - } +out: - reserve_sock_buffer(src_sock_o, totallen); + if (rc == -EAGAIN) + src_sock->source.sock.wait_len = totalremaining + + sizeof(struct data_buf_item); + else + src_sock->source.sock.wait_len = 0; - bufferfree = (__s64) src_sock_o->source.sock.alloclimit + - (__s64) src_sock_o->data_buf.cpacket_buffer - - (__s64) src_sock_o->data_buf.totalsize - - (__s64) src_sock_o->data_buf.overhead; + spin_unlock_bh(&(src_sock->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); + return rc; +} - if (bufferfree <= 0) { - if (copied == 0) - copied = -EAGAIN; - goto out; - } +__s32 __cor_sendmsg(struct msghdr *msg, __u32 totallen, int *iovidx, + int *iovread, struct cor_sock *cs, struct conn *src_sock, + struct conn *trgt_sock) +{ + char *buf = 0; + __u32 bufread = 0; + __u32 buflen = buf_optlen(totallen); - copied = receive_userbuf(src_sock_o, msg, bufferfree, bufferfree >= - totallen ? 0 : (src_sock_o->source.sock.alloclimit + - src_sock_o->data_buf.cpacket_buffer)); + buf = kmalloc(buflen, GFP_KERNEL); + if (unlikely(buf == 0)) + return -ENOMEM; + memset(buf, 0, buflen); - if (0) { -out: - bufferfree = (__s64) src_sock_o->source.sock.alloclimit + - (__s64) src_sock_o->data_buf.cpacket_buffer - - (__s64) src_sock_o->data_buf.totalsize - - (__s64) src_sock_o->data_buf.overhead; + while (bufread < buflen && bufread < totallen) { + struct iovec *iov = msg->msg_iov + *iovidx; + __user char *userbuf = iov->iov_base + *iovread; + __u32 len = iov->iov_len - *iovread; + int notcopied; + + if (len == 0) { + (*iovidx)++; + (*iovread) = 0; + BUG_ON(*iovidx >= msg->msg_iovlen); + continue; + } + + if (len > (buflen - bufread)) + len = buflen - bufread; + if (len > (totallen - bufread)) + len = totallen - bufread; + + notcopied = copy_from_user(buf + bufread, userbuf, len); + + bufread += len - notcopied; + (*iovread) += len - notcopied; + + if (unlikely(notcopied == buflen) && bufread == 0) { + kfree(buf); + return -EFAULT; + } + + if (unlikely(notcopied > 0)) + break; } - if (copied == -EAGAIN) - src_sock_o->source.sock.wait_len = totallen; - else - src_sock_o->source.sock.wait_len = 0; + return ___cor_sendmsg(buf, bufread, buflen, totallen, cs, src_sock, + trgt_sock); +} - mutex_unlock(&(src_sock_o->rcv_lock)); +__s32 _cor_sendmsg(struct msghdr *msg, size_t total_len, struct cor_sock *cs, + struct conn *src_sock, struct conn *trgt_sock) +{ + int flush = (msg->msg_flags & MSG_MORE) == 0; + + __s32 copied = 0; + __s32 rc = 0; + __u64 max = (1LL << 31) - 1; + __u32 totallen = (total_len > max ? max : total_len); - unreserve_sock_buffer(src_sock_o); + int iovidx = 0; + int iovread = 0; - mutex_lock(&sock_bufferlimits_lock); - mutex_lock(&(src_sock_o->rcv_lock)); + while (rc >= 0 && copied < totallen) { + rc = __cor_sendmsg(msg, totallen - copied, &iovidx, &iovread, + cs, src_sock, trgt_sock); - if (unlikely(src_sock_o->isreset != 0)) { - mutex_lock(&(src_sock_o->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); + if (rc > 0 || copied == 0) + copied += rc; + } + + unreserve_sock_buffer(src_sock); + + spin_lock_bh(&sock_bufferlimits_lock); + spin_lock_bh(&(src_sock->rcv_lock)); + + if (unlikely(src_sock->isreset != 0)) { + spin_unlock_bh(&(src_sock->rcv_lock)); + spin_unlock_bh(&sock_bufferlimits_lock); return -EPIPE; } - if (flush == 0 && src_sock_o->data_buf.totalsize + - src_sock_o->data_buf.overhead - - src_sock_o->data_buf.cpacket_buffer < + if (flush == 0 && copied > 0 && copied == total_len && + src_sock->data_buf.totalsize + + src_sock->data_buf.overhead - + src_sock->data_buf.cpacket_buffer < (BUFFERLIMIT_SOCK_SOCK*3)/4) { - if (src_sock_o->source.sock.delay_flush == 0) { + if (src_sock->source.sock.delay_flush == 0) { struct sock_buffertracker *sbt = - src_sock_o->source.sock.sbt; + src_sock->source.sock.sbt; BUG_ON(sbt == 0); - list_add_tail(&(src_sock_o->source.sock.delflush_list), + list_add_tail(&(src_sock->source.sock.delflush_list), &(sbt->delflush_conns)); } - src_sock_o->source.sock.delay_flush = 1; + src_sock->source.sock.delay_flush = 1; } else { - if (src_sock_o->source.sock.delay_flush) { - list_del(&(src_sock_o->source.sock.delflush_list)); + if (src_sock->source.sock.delay_flush) { + list_del(&(src_sock->source.sock.delflush_list)); } - src_sock_o->source.sock.delay_flush = 0; + src_sock->source.sock.delay_flush = 0; } - mutex_unlock(&(src_sock_o->rcv_lock)); - mutex_unlock(&sock_bufferlimits_lock); + spin_unlock_bh(&(src_sock->rcv_lock)); + spin_unlock_bh(&sock_bufferlimits_lock); - if (likely(copied > 0 || bufferfree <= 0)) - flush_buf(src_sock_o); + return copied; +} - if (copied == -EAGAIN && blocking) { - if (wait_event_interruptible(src_sock_o->source.sock.wait, - sendmsg_maypush(src_sock_o)) == 0) - goto recv; - copied = -ERESTARTSYS; +int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, + size_t total_len) +{ + __s32 rc = 0; + + int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0; + + struct cor_sock *cs = (struct cor_sock *) sock->sk; + struct conn *src_sock; + struct conn *trgt_sock; + + spin_lock_bh(&(cs->lock)); + + BUG_ON(cs->type != SOCKTYPE_UNCONNECTED && + cs->type != SOCKTYPE_LISTENER && + cs->type != SOCKTYPE_CONN); + + if (unlikely(cs->type != SOCKTYPE_CONN)) { + spin_unlock_bh(&(cs->lock)); + return -EBADF; } - BUG_ON(copied > total_len); - return copied; + src_sock = cs->data.conn.src_sock; + trgt_sock = cs->data.conn.trgt_sock; + + kref_get(&(src_sock->ref)); + kref_get(&(trgt_sock->ref)); + + spin_unlock_bh(&(cs->lock)); + + BUG_ON(src_sock == 0); + BUG_ON(trgt_sock == 0); + +send: + rc = _cor_sendmsg(msg, total_len, cs, src_sock, trgt_sock); + + if (likely(rc > 0 || rc == -EAGAIN)) + flush_buf(src_sock); + + if (rc == -EAGAIN && blocking) { + #warning todo move waitqueue to cor_sock + if (wait_event_interruptible(src_sock->source.sock.wait, + sendmsg_maypush(cs, src_sock)) == 0) + goto send; + rc = -ERESTARTSYS; + } + + BUG_ON(rc > total_len); + return rc; } static int cor_readytoread(struct conn *trgt_sock_o) { int rc = 0; - mutex_lock(&(trgt_sock_o->rcv_lock)); + spin_lock_bh(&(trgt_sock_o->rcv_lock)); rc = (trgt_sock_o->data_buf.read_remaining != 0) || unlikely(trgt_sock_o->isreset != 0); - mutex_unlock(&(trgt_sock_o->rcv_lock)); + spin_unlock_bh(&(trgt_sock_o->rcv_lock)); return rc; } +static int __cor_recvmsg(struct msghdr *msg, __u32 totallen, + int *iovidx, int *iovwritten, + struct cor_sock *cs, struct conn *trgt_sock) +{ + struct data_buf_item *dbi = cs->data.conn.rcvitem; + int written = 0; + + while (written < totallen && dbi != 0) { + struct iovec *iov = msg->msg_iov + *iovidx; + __user char *userbuf = iov->iov_base + *iovwritten; + __u32 len = iov->iov_len - *iovwritten; + int notcopied; + + if (len == 0) { + (*iovidx)++; + (*iovwritten) = 0; + BUG_ON(*iovidx >= msg->msg_iovlen); + continue; + } + + if (dbi->datalen == cs->data.conn.rcvoffset) { + databuf_item_free(cs->data.conn.rcvitem); + cs->data.conn.rcvitem = 0; + cs->data.conn.rcvoffset = 0; + break; + } + + if (len > (dbi->datalen - cs->data.conn.rcvoffset)) + len = dbi->datalen - cs->data.conn.rcvoffset; + if (len > (totallen - written)) + len = totallen - written; + + notcopied = copy_to_user(userbuf, dbi->buf + + cs->data.conn.rcvoffset, len); + + written += (len - notcopied); + (*iovwritten) += (len - notcopied); + cs->data.conn.rcvoffset += (len - notcopied); + + if (unlikely(notcopied == len) && written == 0) + return -EFAULT; + + if (notcopied > 0) + break; + } + + if (written == 0) + return -EAGAIN; + return written; +} + +static int _cor_recvmsg(struct msghdr *msg, size_t total_len, + struct cor_sock *cs, struct conn *trgt_sock) +{ + int copied = 0; + int rc = 0; + + __u64 max = (1LL << 31) - 1; + __u32 totallen = (total_len > max ? max : total_len); + + int iovidx = 0; + int iovwritten = 0; + + mutex_lock(&(cs->data.conn.rcvbuf_lock)); + + while (rc >= 0 && copied < totallen) { + spin_lock_bh(&(trgt_sock->rcv_lock)); + spin_lock_bh(&(cs->lock)); + if (unlikely(unlikely(trgt_sock != cs->data.conn.trgt_sock)|| + unlikely(trgt_sock->isreset != 0))) { + spin_unlock_bh(&(cs->lock)); + spin_unlock_bh(&(trgt_sock->rcv_lock)); + mutex_unlock(&(cs->data.conn.rcvbuf_lock)); + return -EPIPE; + } + + spin_unlock_bh(&(cs->lock)); + + if (cs->data.conn.rcvitem == 0) + databuf_pull_dbi(cs, trgt_sock); + + spin_unlock_bh(&(trgt_sock->rcv_lock)); + + rc = __cor_recvmsg(msg, totallen - copied, &iovidx, &iovwritten, + cs, trgt_sock); + + if (rc > 0 || copied == 0) + copied += rc; + } + + mutex_unlock(&(cs->data.conn.rcvbuf_lock)); + + return copied; +} + int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t total_len, int flags) { - struct conn *src_sock_o = (struct conn *) sock->sk; - struct conn *trgt_sock_o; size_t copied = 0; + int blocking = (flags & MSG_DONTWAIT) == 0; - int rc = check_conn_state(src_sock_o); + struct cor_sock *cs = (struct cor_sock *) sock->sk; + struct conn *src_sock; + struct conn *trgt_sock; - int blocking = (flags & MSG_DONTWAIT) == 0; + spin_lock_bh(&(cs->lock)); - if (unlikely(rc)) + BUG_ON(cs->type != SOCKTYPE_UNCONNECTED && + cs->type != SOCKTYPE_LISTENER && + cs->type != SOCKTYPE_CONN); + + if (unlikely(cs->type != SOCKTYPE_CONN)) { + spin_unlock_bh(&(cs->lock)); return -EBADF; + } - trgt_sock_o = src_sock_o->reversedir; + src_sock = cs->data.conn.src_sock; + trgt_sock = cs->data.conn.trgt_sock; - BUG_ON(trgt_sock_o == 0); + BUG_ON(src_sock == 0); + BUG_ON(trgt_sock == 0); -recv: - mutex_lock(&(trgt_sock_o->rcv_lock)); - - if (unlikely(trgt_sock_o->isreset != 0)) { - copied = -EPIPE; - goto out; - } + kref_get(&(src_sock->ref)); + kref_get(&(trgt_sock->ref)); - copied = databuf_pulluser(trgt_sock_o, msg); - databuf_ackread(trgt_sock_o); + spin_unlock_bh(&(cs->lock)); -out: - mutex_unlock(&(trgt_sock_o->rcv_lock)); +recv: + copied = _cor_recvmsg(msg, total_len, cs, trgt_sock); if (likely(copied > 0)) { - refresh_conn_credits(trgt_sock_o, 0, 0); - wake_sender(trgt_sock_o); + refresh_conn_credits(trgt_sock, 0, 0); + wake_sender(trgt_sock); } - if (copied == -EAGAIN && blocking) { - if (wait_event_interruptible(trgt_sock_o->target.sock.wait, - cor_readytoread(trgt_sock_o)) == 0) + if (wait_event_interruptible(trgt_sock->target.sock.wait, + cor_readytoread(trgt_sock)) == 0) goto recv; copied = -ERESTARTSYS; } + kref_put(&(src_sock->ref), free_conn); + kref_put(&(trgt_sock->ref), free_conn); + return copied; } @@ -924,11 +1177,22 @@ const struct proto_ops cor_proto_ops = { int cor_createsock(struct net *net, struct socket *sock, int protocol) { + struct cor_sock *cs; + if (unlikely(protocol != 0)) return -EPROTONOSUPPORT; + cs = kmem_cache_alloc(sock_slab, GFP_KERNEL); + if (unlikely(cs == 0)) + return -ENOMEM; + memset(cs, 0, sizeof(struct cor_sock)); + + cs->type = SOCKTYPE_UNCONNECTED; + spin_lock_init(&(cs->lock)); + sock->state = SS_UNCONNECTED; sock->ops = &cor_proto_ops; + sock->sk = (struct sock *) cs; return 0; } @@ -941,6 +1205,9 @@ static struct net_proto_family cor_net_proto_family = { static int __init cor_sock_init(void) { + sock_slab = kmem_cache_create("cor_sock", + sizeof(struct cor_sock), 8, 0, 0); + INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace); outofsockbufferspace_scheduled = 0; -- 2.11.4.GIT