当前位置: 代码迷 >> 综合 >> linux内核协议栈 UDP之数据报接收过程Ⅱ
  详细解决方案

linux内核协议栈 UDP之数据报接收过程Ⅱ

热度:54   发布时间:2024-03-06 15:01:14.0

目录

1 系统调用 udp_recvmsg()

1.1 从接收队列 sk_receive_queue 中获取skb

1.1.1 获取队列头不删除 skb_peek()

1.1.2 将 skb 从移除队列中 __skb_unlink()

1.2 尝试释放skb内存 skb_free_datagram_locked()

2 后备队列 sk_backlog 中的skb处理 release_sock()

2.1 后备队列skb进入接收队列 sk_backlog_rcv()


1 系统调用 udp_recvmsg()

对于应用程序而言,读操作可以通过多个系统调用实现,如read()、recv()、recvfrom()等等,但是这些系统调用到了传输层协议,都调用到了同一接口,对于UDP就是udp_recvmsg()。

int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,size_t len, int noblock, int flags, int *addr_len)
{struct inet_sock *inet = inet_sk(sk);struct sockaddr_in *sin = (struct sockaddr_in *)msg->msg_name;struct sk_buff *skb;unsigned int ulen, copied;int peeked;int err;int is_udplite = IS_UDPLITE(sk);//需要返回源地址信息,设置源地址长度if (addr_len)*addr_len = sizeof(*sin);//如果设置了MSG_ERRQUEUE标记,那么只读取错误信息if (flags & MSG_ERRQUEUE)return ip_recv_error(sk, msg, len);try_again://根据是否需要阻塞,从接收队列中取出一个SKBskb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),&peeked, &err);if (!skb)goto out;//ulen为该SKB中包含的应用数据长度ulen = skb->len - sizeof(struct udphdr);//len为应用程序指定的buffer大小,所以下面的逻辑含义为://1. 如果应用提供的buffer超过了该数据包的数据长度,那么调整要拷贝的数据量为该SKB中实际数据量//2. 如果应用提供的buffer不够大,那么需要截断数据包,设置截断标记copied = len;if (copied > ulen)copied = ulen;else if (copied < ulen)msg->msg_flags |= MSG_TRUNC;/** If checksum is needed at all, try to do it while copying the* data.  If the data is truncated, or if we only want a partial* coverage checksum (UDP-Lite), do it before the copy.*///条件一:对于截断的数据包和尚未完成校验的数据包,先进行校验,校验出错则尝试读取下一个数据包//条件二:实际上只用于UDPLite,因为UDP协议的校验在接收过程的第一步就完成了if (copied < ulen || UDP_SKB_CB(skb)->partial_cov) {if (udp_lib_checksum_complete(skb))goto csum_copy_err;}//根据是否需要校验,调用不同的数据拷贝函数if (skb_csum_unnecessary(skb))err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr),msg->msg_iov, copied);else {//在数据拷贝过程中还会进行校验err = skb_copy_and_csum_datagram_iovec(skb,sizeof(struct udphdr),msg->msg_iov);if (err == -EINVAL)goto csum_copy_err;}//数据拷贝失败,返回错误if (err)goto out_free;//只有非PEEK读取才更新统计信息if (!peeked)UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INDATAGRAMS, is_udplite);//更新数据包接收的时间到sk->sk_stamp中sock_recv_timestamp(msg, sk, skb);//拷贝数据包源地址信息,该地址会返回给应用程序if (sin) {sin->sin_family = AF_INET;sin->sin_port = udp_hdr(skb)->source;sin->sin_addr.s_addr = ip_hdr(skb)->saddr;memset(sin->sin_zero, 0, sizeof(sin->sin_zero));}//获取控制信息if (inet->cmsg_flags)ip_cmsg_recv(msg, skb);//读取成功,返回值err表示的是已经读取到的字节数err = copied;if (flags & MSG_TRUNC)err = ulen;out_free://释放该SKB的数据skb_free_datagram_locked(sk, skb);
out:return err;csum_copy_err:lock_sock(sk);if (!skb_kill_datagram(sk, skb, flags))UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INERRORS, is_udplite);release_sock(sk);if (noblock)return -EAGAIN;goto try_again;
}

1.1 从接收队列 sk_receive_queue 中获取skb

/***	__skb_recv_datagram - Receive a datagram skbuff*	@sk: socket*	@flags: MSG_ flags*	@peeked: returns non-zero if this packet has been seen before*	@err: error code returned*/
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,int *peeked, int *err)
{struct sk_buff *skb;long timeo;//如果该socket遇到了错误,返回错误int error = sock_error(sk);if (error)goto no_packet;//根据是否设置了非阻塞标记,获取超时时间。对于非阻塞模式,timeo为0timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);do {/* Again only user level code calls this function, so nothing* interrupt level will suddenly eat the receive_queue.** Look at current nfs client by the way...* However, this function was corrent in any case. 8)*/unsigned long cpu_flags;//关中断并且持有接收队列的锁spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags);//获取接收队列中的第一个skbskb = skb_peek(&sk->sk_receive_queue);if (skb) {*peeked = skb->peeked;//如果设置了MSG_PEEK标记,那么设置skb的peek标记,并且增加对skb的引用计数,//该标记很重要,会影响是否释放该skb,见下文的总结if (flags & MSG_PEEK) {skb->peeked = 1;atomic_inc(&skb->users);} else//非MSG_PEEK场景,将该skb从接收队列中移除__skb_unlink(skb, &sk->sk_receive_queue);}//释放接收队列锁并开启中断spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags);//找到了skb,直接返回if (skb)return skb;//当前接收队列为空,如果超时时间为0,即非阻塞模式,那么直接返回EAGAIN错误/* User doesn't want to wait */error = -EAGAIN;if (!timeo)goto no_packet;//没有可读数据,需要阻塞等待数据可用,阻塞在了sk->sk_sleep等待队列上} while (!wait_for_packet(sk, err, &timeo));return NULL;no_packet:*err = error;return NULL;
}

1.1.1 获取队列头不删除 skb_peek()

/***	skb_peek*	@list_: list to peek at**	Peek an &sk_buff. Unlike most other operations you _MUST_*	be careful with this one. A peek leaves the buffer on the*	list and someone else may run off with it. You must hold*	the appropriate locks or have a private queue to do this.**	Returns %NULL for an empty list or a pointer to the head element.*	The reference count is not incremented and the reference is therefore*	volatile. Use with caution.*/
//如注释所述,使用该函数需要小心,保证不会有并发问题。这里是在持有锁的情况下操作的
static inline struct sk_buff *skb_peek(struct sk_buff_head *list_)
{//该函数会返回list中第一个skb的指针,但是并不会将该skb从队列中移除,这点很重要struct sk_buff *list = ((struct sk_buff *)list_)->next;if (list == (struct sk_buff *)list_)list = NULL;return list;
}

1.1.2 将 skb 从移除队列中 __skb_unlink()

/** remove sk_buff from list. _Must_ be called atomically, and with* the list known..*/
//将skb从队列list中移除,典型的链表操作
static inline void __skb_unlink(struct sk_buff *skb, struct sk_buff_head *list)
{struct sk_buff *next, *prev;list->qlen--;next	   = skb->next;prev	   = skb->prev;skb->next  = skb->prev = NULL;next->prev = prev;prev->next = next;
}

1.2 尝试释放skb内存 skb_free_datagram_locked()

该函数尝试释放SKB,但是要注意,是否真的会释放最终取决于SKB自己维护的引用计数。

/** Read buffer destructor automatically called from kfree_skb.*/
void sock_rfree(struct sk_buff *skb)
{struct sock *sk = skb->sk;//该SKB将被释放,所以递减传输控制块占用的内存记账atomic_sub(skb->truesize, &sk->sk_rmem_alloc);sk_mem_uncharge(skb->sk, skb->truesize);
}void skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb)
{//因为如果真的触发释放SKB,那么会调用skb->destructor()回调,在接收过程的第一步,找到//传输控制块后,使用skb_set_owner_r()将该skb的属主设置成了当前传输控制块,当时指定的//回调函数是sock_rfree(),在该函数中会操作传输控制块的成员,所以这里需要提前锁定lock_sock(sk);skb_free_datagram(sk, skb);release_sock(sk);
}void skb_free_datagram(struct sock *sk, struct sk_buff *skb)
{consume_skb(skb);sk_mem_reclaim_partial(sk);
}/***	consume_skb - free an skbuff*	@skb: buffer to free**	Drop a ref to the buffer and free it if the usage count has hit zero*	Functions identically to kfree_skb, but kfree_skb assumes that the frame*	is being dropped after a failure and notes that*/
void consume_skb(struct sk_buff *skb)
{if (unlikely(!skb))return;//如果该skb的引用计数为1,那么需要真的释放if (likely(atomic_read(&skb->users) == 1))smp_rmb();//如果skb引用计数大于1,那么仅仅是将其引用计数减1else if (likely(!atomic_dec_and_test(&skb->users)))return;__kfree_skb(skb);
}

到此,回忆一下前面设置了MSG_PEEK的处理,对于此种情况,在调用__skb_recv_datagram()时并不会真的将skb从接收队列中移除,只是返回其指针,并且增加了对该skb的引用计数,所以在接收完毕后调用skb_free_datagram_locked()的时候,该skb的引用计数至少为2,并不会真正的释放。

2 后备队列 sk_backlog 中的skb处理 release_sock()

在《linux内核协议栈 UDP之数据报接收过程Ⅰ》中有提到,在软中断接收过程中,如果当前传输控制块刚好被进程上下文锁定,那么只是将数据放入到后备队列中,我们并没有介绍该队列中的数据又是如何被应用接收的。实际上,在进程上下文中调用release_sock()的时候会处理该后备队列,代码如下:

void release_sock(struct sock *sk)
{/** The sk_lock has mutex_unlock() semantics:*/mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);spin_lock_bh(&sk->sk_lock.slock);//重点看这里,如果后备队列不为空,调用__release_sock()进行处理if (sk->sk_backlog.tail)__release_sock(sk);sk->sk_lock.owned = 0;if (waitqueue_active(&sk->sk_lock.wq))wake_up(&sk->sk_lock.wq);spin_unlock_bh(&sk->sk_lock.slock);
}

关于传输控制块的同步锁可以参考笔记《linux内核协议栈 套接口层之传输控制块同步锁socket_lock_t》

static void __release_sock(struct sock *sk)
{//获取后备队列第一个元素struct sk_buff *skb = sk->sk_backlog.head;do {//这里先将后备队列清空,然后打开硬中断,但是软中没有打开。//由于对数据包的处理比较耗时,这种处理方式可以提高系统性能sk->sk_backlog.head = sk->sk_backlog.tail = NULL;bh_unlock_sock(sk);//循环处理后备队列中数据包do {struct sk_buff *next = skb->next;skb->next = NULL;//处理该数据包sk_backlog_rcv(sk, skb);/** We are in process context here with softirqs* disabled, use cond_resched_softirq() to preempt.* This is safe to do because we've taken the backlog* queue private:*///重新调度一下下半部cond_resched_softirq();skb = next;} while (skb != NULL);//再次持锁,因为要判断传输控制块的后备队列是否为空。因为前面重新调度过软中断,//所以下面的外层循环可以保证能够处理新到来的数据包bh_lock_sock(sk);} while ((skb = sk->sk_backlog.head) != NULL);
}

2.1 后备队列skb进入接收队列 sk_backlog_rcv()

static inline int sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{//对于UDP,该回调函数就是__udp_queue_rcv_skb(),//对于TCP,该回调函数则是 tcp_v4_do_rcv()//在软中断中就是使用该函数将数据包放入了接收队列return sk->sk_backlog_rcv(sk, skb);
}