基于Select/Poll实现并发服务器(二)

描述

开发环境:

RT-Thread版本:4.0.3

操作系统:Windows10

Keil版本:V5.30

RT-Thread Studio版本:2.0.1

开发板MCU:STM32H750XB

LWIP:2.0.2

3 Select/Poll概述

在LWIP中,如果要实现并发服务器,可以基于Sequentaial API来实现,这种方式需要使用多线程,也就是为每个连接创建一个线程来处理数据。而在资源受限的嵌入式设备来说,如果为每个连接都创建一个线程,这种资源的消耗是巨大的,因此,我们需要换一种实现思路,也就是使用IO多路复用的机制来实现,也就是select机制。

Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。

值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSD Unix系统继承而来,poll则是从System V Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。

 

3.1 Select函数

在BSD Socket 中,select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);

【参数说明】

  • nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。
  • readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。
  • writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。
  • exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。
  • timeout 参数是一个指向 struct timeval 类型的指针,它可以使 select()在等待 timeout 时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态:

(1) 若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;

(2) 若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;

(3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。

timeval 结构体定义

struct timeval
{
    int tv_sec;/* 秒 */
    int tv_usec;/* 微妙 */
};

【返回值】

  • int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1

 

下列操作用来设置、清除、判断文件描述符集合。

FD_ZERO(fd_set *set);//清除一个文件描述符集。

FD_SET(int fd,fd_set *set);//将一个文件描述符加入文件描述符集中。

FD_CLR(int fd,fd_set *set);//将一个文件描述符从文件描述符集中清除。

FD_ISSET(int fd,fd_set *set);//判断文件描述符是否被置位

fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。

select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。

 

3.2 Poll函数

poll的函数原型:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

【参数说明】

  • fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。

struct pollfd原型如下:

typedef struct pollfd {
        int fd;                 // 需要被检测或选择的文件描述符
        short events;           // 对文件描述符fd上感兴趣的事件
        short revents;          // 文件描述符fd上当前实际发生的事件
} pollfd_t;

 

其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。

  • nfds:记录数组fds中描述符的总数量。
  • timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回,和select函数是类似的。

【返回值】

  • int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;

 

poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。

 

4 LWIP 的select/poll实现

好了,接下来看看LWIP是如何实现select/poll的。

4.1 lwip_select实现

目前LWIP已经完全实现select,它是基于信号量的机制来实现的,函数名是lwip_select。

LWIP实现Select的基本流程如下:

1.依次检套接字集合中的每个套接字的事件表示,若有效,则记录该套接字。

2.若存在一个或多事件,则返回,否则创建一个信号量并阻塞等待,记录信号量的结构体是select_cb_list,是一个链表,在[sockets.c]文件中定义的:

static struct lwip_select_cb *select_cb_list;//管理select的链表

lwip_select_cb原型如下:

/** Description for a task waiting in select */
struct lwip_select_cb {
 /** Pointer to the next waiting task */
 struct lwip_select_cb *next;
 /** Pointer to the previous waiting task */
 struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
 /** readset passed to select */
 fd_set *readset;
 /** writeset passed to select */
 fd_set *writeset;
 /** unimplemented: exceptset passed to select */
 fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
 /** fds passed to poll; NULL if select */
 struct pollfd *poll_fds;
 /** nfds passed to poll; 0 if select */
 nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
 /** don't signal the same semaphore twice: set to 1 when signalled */
 int sem_signalled;//是否释放信号领
 /** semaphore to wake up a task waiting for select */
 SELECT_SEM_T sem;//select阻塞的信号量
};

 

3.当套接字集合初始化,会向netconn结构注册回调函数event_callback,当有是事件发生时,回调函数就被被执行,而且回调函数会遍历select_cb_list,如果套接字在select_cb_list中,则select_cb_list释放一个信号量。

好了,接下来看看LWIP的select具体实现,其原型如下:

int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
            struct timeval *timeout)
{
  u32_t waitres = 0;//记录select等待时间
 int nready;
 fd_set lreadset, lwriteset, lexceptset;//记录发生事件的套接字
 u32_t msectimeout;
 int i;
 int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
 int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
 fd_set used_sockets;
#endif
 SYS_ARCH_DECL_PROTECT(lev);

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",
                              maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
                              timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
                              timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));

 if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
    set_errno(EINVAL);
    return -1;
 }

 lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);

 /* Go through each socket in each list to count number of sockets which
     currently match */
 //检测套接字集合中是否发生事件
 nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);

 if (nready < 0) {
    /* one of the sockets in one of the fd_sets was invalid */
    set_errno(EBADF);
    lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
    return -1;
 } else if (nready > 0) {
    /* one or more sockets are set, no need to wait */
    LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
 } else {
    /* If we don't have any current events, then suspend if we are supposed to */
    if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
      /* This is OK as the local fdsets are empty and nready is zero,
         or we would have returned earlier. */
    } else {
      /* None ready: add our semaphore to list:
         We don't actually need any dynamic memory. Our entry on the
         list is only valid while we are in this function, so it's ok
         to use local variables (unless we're running in MPU compatible
         mode). */
      API_SELECT_CB_VAR_DECLARE(select_cb);
      API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
     memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));

      API_SELECT_CB_VAR_REF(select_cb).readset = readset;
      API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
     API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
      API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
        /* failed to create semaphore */
        set_errno(ENOMEM);
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
       API_SELECT_CB_VAR_FREE(select_cb);
        return -1;
      }
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

     lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

      /* Increase select_waiting for each socket we are interested in */
      maxfdp2 = maxfdp1;
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            sock->select_waiting++;//读写异常通知,并且socket是存在的,则会将select_wainting增加1
            if (sock->select_waiting == 0) {
              /* overflow - too many threads waiting */
              sock->select_waiting--;
              nready = -1;
              maxfdp2 = i;
              SYS_ARCH_UNPROTECT(lev);
              done_socket(sock);
              set_errno(EBUSY);
              break;
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            /* Not a valid socket */
            nready = -1;
            maxfdp2 = i;
            SYS_ARCH_UNPROTECT(lev);
            set_errno(EBADF);
            break;
          }
        }
      }
        
      if (nready >= 0) {
        /* Call lwip_selscan again: there could have been events between
          the last scan (without us on the list) and putting us on the list! */

//执行完上述操作,再次扫描一次是否有socket有事件产生
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        if (!nready) {
          /* Still none ready, just wait to be woken */
          if (timeout == 0) {
            /* Wait forever */
            msectimeout = 0;
          } else {
            long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
            if (msecs_long <= 0) {
              /* Wait 1ms at least (0 means wait forever) */
              msectimeout = 1;
            } else {
              msectimeout = (u32_t)msecs_long;
            }
          }
         //休眠指定时间,让出cpu控制权
          waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
          waited = 1;
#endif
        }
      }
      
      /* Decrease select_waiting for each socket we are interested in */
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            /* for now, handle select_waiting==0... */
           LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
            if (sock->select_waiting > 0) {
              sock->select_waiting--;//休眠结束, 将对应socket->select_waiting减1
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            SYS_ARCH_UNPROTECT(lev);
            /* Not a valid socket */
            nready = -1;
            set_errno(EBADF);
          }
        }
      }

     lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

#if LWIP_NETCONN_SEM_PER_THREAD
      if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
        /* don't leave the thread-local semaphore signalled */
       sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
      }
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
      API_SELECT_CB_VAR_FREE(select_cb);

      if (nready < 0) {
        /* This happens when a socket got closed while waiting */
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
        return -1;
      }

      if (waitres == SYS_ARCH_TIMEOUT) {
        /* Timeout */
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
        /* This is OK as the local fdsets are empty and nready is zero,
           or we would have returned earlier. */
      } else {
        /* See what's set now after waiting */
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
      }
    }
 }

 lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
 set_errno(0);
 if (readset) {
    *readset = lreadset;
 }
 if (writeset) {
    *writeset = lwriteset;
 }
 if (exceptset) {
    *exceptset = lexceptset;
 }
 return nready;
}

 

以上代码最核心的就是socket->select_waiting加1和减1的地方,当socket存在且的确需要监听事件,且并不是进来事件就已经产生或者已经超时,一定会加1;然后线程会有可能会进行休眠;正常情况下,休眠结束后,socket->select_waiting减1,离开该函数,socket->select_waiting恢复原值。但是,如果在休眠期间进行了close(socket),则通过try_socket(socket)获取不到socket结构体,则socket->select_waiting不会进行减1,后面执行一系列语句后,退出该函数,socket->select_waiting没有恢复原值,且比进来时大1。针对该函数,socket->select_waiting加1的次数是>=减1的次数,所以如果只要在函数退出时没有恢复原值,则socket->select_waiting永远不可能再减为0了,此时socket资源就出现了假占用,该socket再也不能被其他人使用了。

 

lwip_select函数实现的具体流程如下:

select

Select的实现有个重要的结构体lwip_sock,其原型如下:

/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
 /** sockets currently are built on netconns, each socket has one netconn */
 struct netconn *conn;
 /** data that was left from the previous read */
 union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
 /** number of times data was received, set by event_callback(),
      tested by the receive and select functions */
 s16_t rcvevent;
 /** number of times data was ACKed (free send buffer), set by event_callback(),
      tested by select */
 u16_t sendevent;
 /** error happened for this socket, set by event_callback(), tested by select */
 u16_t errevent;
 /** counter of how many threads are waiting for this socket using select */
 SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
 /* counter of how many threads are using a struct lwip_sock (not the 'int') */
 u8_t fd_used;
 /* status of pending close/delete actions */
 u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP  1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif

#ifdef SAL_USING_POSIX
 rt_wqueue_t wait_head;
#endif
};

 

在socket数据接收时,lwip_sock利用netconn相关的接收函数获得一个pbuf(对于TCP)或者一个netbuf(对于UDP)数据,而这二者封装的数据可能大于socket用户指定的数据接收长度,因此在这种情况下,这两个数据包需要暂时保存在socket中,以待用户下一次读取,这里lastdata就用于指向未被用户完全读取的数据包,而lastoffset则指向了未读取的数据在数据包中的偏移。lwip_sock最后的五个字段是为select机制实现时使用的。

lwip_socket是上层Socket API中的实现,它对netconn结构的封装和增强,描述一个具体连接。它基于内核netconn来实现所有逻辑,conn指向了与socket对应的netconn结构。Netconn原型如下:

/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);

/** A netconn descriptor */
struct netconn {
 /** type of the netconn (TCP, UDP or RAW) */
 enum netconn_type type;
 /** current state of the netconn */
 enum netconn_state state;
 /** the lwIP internal protocol control block */
 union {
    struct ip_pcb  *ip;
    struct tcp_pcb *tcp;
    struct udp_pcb *udp;
    struct raw_pcb *raw;
 } pcb;
 /** the last asynchronous unreported error this netconn had */
 err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
 /** sem that is used to synchronously execute functions in the core context */
 sys_sem_t op_completed;
#endif
 /** mbox where received packets are stored until they are fetched
      by the netconn application thread (can grow quite big) */
 sys_mbox_t recvmbox;
#if LWIP_TCP
 /** mbox where new connections are stored until processed
      by the application thread */
 sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX
 /** number of threads waiting on an mbox. This is required to unblock
      all threads when closing while threads are waiting. */
 int mbox_threads_waiting;
#endif
 /** only used for socket layer */
#if LWIP_SOCKET
 int socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO
 /** timeout to wait for sending data (which means enqueueing data for sending
      in internal buffers) in milliseconds */
 s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO
 /** timeout in milliseconds to wait for new data to be received
      (or connections to arrive for listening netconns) */
 u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF
 /** maximum amount of bytes queued in recvmbox
      not used for TCP: adjust TCP_WND instead! */
 int recv_bufsize;
 /** number of bytes currently in recvmbox to be received,
      tested against recv_bufsize to limit bytes on recvmbox
      for UDP and RAW, used for FIONREAD */
 int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER
   /** values <0 mean linger is disabled, values > 0 are seconds to linger */
 s16_t linger;
#endif /* LWIP_SO_LINGER */
 /** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */
 u8_t flags;
#if LWIP_TCP
 /** TCP: when data passed to netconn_write doesn't fit into the send buffer,
      this temporarily stores the message.
      Also used during connect and close. */
 struct api_msg *current_msg;
#endif /* LWIP_TCP */
 /** A callback function that is informed about events for this netconn */
 netconn_callback callback;
};

 

前文已经提到,套接字集合初始化时,会向netconn结构注册回调函数event_callback,这个回调函数就是结构体netconn中netconn_callback,接下来看看netconn_callback函数原型:

/**
 * Callback registered in the netconn layer for each socket-netconn.
 * Processes recvevent (data available) and wakes up tasks waiting for select.
 *
 * @note for LWIP_TCPIP_CORE_LOCKING any caller of this function
 * must have the core lock held when signaling the following events
 * as they might cause select_list_cb to be checked:
 *  NETCONN_EVT_RCVPLUS 数据被内核接收则会产生该事件
 *  NETCONN_EVT_SENDPLUS数据成功发送则产生该事件
 *  NETCONN_EVT_ERROR连接错误则产生该事件
 * This requirement will be asserted in select_check_waiters()
 */
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{
 int s, check_waiters;
 struct lwip_sock *sock;
 SYS_ARCH_DECL_PROTECT(lev);

 LWIP_UNUSED_ARG(len);

 /* Get socket */
 if (conn) {
    s = conn->socket;
    if (s < 0) {
      /* Data comes in right away after an accept, even though
       * the server task might not have created a new socket yet.
      * Just count down (or up) if that's the case and we
       * will use the data later. Note that only receive events
       * can happen before the new socket is set up. */
      SYS_ARCH_PROTECT(lev);
      if (conn->socket < 0) {
        if (evt == NETCONN_EVT_RCVPLUS) {
          /* conn->socket is -1 on initialization
             lwip_accept adjusts sock->recvevent if conn->socket < -1 */
          conn->socket--;
        }
        SYS_ARCH_UNPROTECT(lev);
        return;
      }
      s = conn->socket;
      SYS_ARCH_UNPROTECT(lev);
    }

    sock = get_socket(s);//获取socket对应的结构
    if (!sock) {
      return;
    }
 } else {
    return;
 }

 check_waiters = 1;
 //进入临界区,根据事件来更新socket的event值
 SYS_ARCH_PROTECT(lev);
 /* Set event as required */
 switch (evt) {
    case NETCONN_EVT_RCVPLUS://数据被内核收到
      sock->rcvevent++;
      if (sock->rcvevent > 1) {
        check_waiters = 0;
      }
      break;
    case NETCONN_EVT_RCVMINUS://数据被用户读取
      sock->rcvevent--;
      check_waiters = 0;
      break;
    case NETCONN_EVT_SENDPLUS://输出发送成功
      if (sock->sendevent) {
        check_waiters = 0;
      }
      sock->sendevent = 1;
      break;
    case NETCONN_EVT_SENDMINUS://用户写入数据到缓冲区
      sock->sendevent = 0;
      check_waiters = 0;
      break;
    case NETCONN_EVT_ERROR://连接错误
      sock->errevent = 1;
      break;
    default:
      LWIP_ASSERT("unknown event", 0);
      break;
 }
  //事件设置完毕,唤醒阻塞的select函数
 if (sock->select_waiting && check_waiters) {
    /* Save which events are active */
    int has_recvevent, has_sendevent, has_errevent;
    has_recvevent = sock->rcvevent > 0;//数据可读事件
    has_sendevent = sock->sendevent != 0;//数据可写事件
    has_errevent = sock->errevent != 0;//数据异常事件
    SYS_ARCH_UNPROTECT(lev);
    /* Check any select calls waiting on this socket */
    select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
 } else {
    SYS_ARCH_UNPROTECT(lev);
 }
 done_socket(sock);
}

 

综上,event_callback 的本质就是readset、writeset、exceptset集合的监听,并对rcvevent、sendevent、errevent的填写,并阻塞的lwip_select函数发送信号量。而lwip_select的本质就是对rcvevent、sendevent、errevent的读取,并执行相应的操作,lwip_select主要是通过lwip_selscan来扫描事件的。

 

4.2 lwip_poll实现

LWIP也完全实现poll,函数名是lwip_poll。lwip_poll和lwip_select的实现机制差不多,只是lwip_poll使用pollfd的结构来存储描述符的,它是基于链表来存储的,这样lwip_poll函数没有最大文件描述符数量的限制。lwip_poll函数原型如下:

int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
 u32_t waitres = 0;
 int nready;
 u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREAD
 int waited = 0;
#endif

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
                  (void*)fds, (int)nfds, timeout));
 LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),
             set_errno(EINVAL); return -1;);

 lwip_poll_inc_sockets_used(fds, nfds);

 /* Go through each struct pollfd to count number of structures
     which currently match */
 nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);

 if (nready < 0) {
    lwip_poll_dec_sockets_used(fds, nfds);
    return -1;
 }

 /* If we don't have any current events, then suspend if we are supposed to */
 if (!nready) {
    API_SELECT_CB_VAR_DECLARE(select_cb);

    if (timeout == 0) {
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
      goto return_success;
    }
    API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);
   memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));

    /* None ready: add our semaphore to list:
       We don't actually need any dynamic memory. Our entry on the
       list is only valid while we are in this function, so it's ok
       to use local variables. */

    API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;
    API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREAD
    API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
    if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
      /* failed to create semaphore */
      set_errno(EAGAIN);
      lwip_poll_dec_sockets_used(fds, nfds);
      API_SELECT_CB_VAR_FREE(select_cb);
      return -1;
    }
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

   lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

    /* Increase select_waiting for each socket we are interested in.
       Also, check for events again: there could have been events between
       the last scan (without us on the list) and putting us on the list! */
    nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);

    if (!nready) {
      /* Still none ready, just wait to be woken */
      if (timeout < 0) {
        /* Wait forever */
        msectimeout = 0;
      } else {
        /* timeout == 0 would have been handled earlier. */
        LWIP_ASSERT("timeout > 0", timeout > 0);
        msectimeout = timeout;
      }
      waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
      waited = 1;
#endif
    }

    /* Decrease select_waiting for each socket we are interested in,
       and check which events occurred while we waited. */
    nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);

   lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

#if LWIP_NETCONN_SEM_PER_THREAD
    if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
      /* don't leave the thread-local semaphore signalled */
     sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
    }
#else /* LWIP_NETCONN_SEM_PER_THREAD */
   sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
    API_SELECT_CB_VAR_FREE(select_cb);

    if (nready < 0) {
      /* This happens when a socket got closed while waiting */
      lwip_poll_dec_sockets_used(fds, nfds);
      return -1;
    }

    if (waitres == SYS_ARCH_TIMEOUT) {
      /* Timeout */
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
      goto return_success;
    }
 }

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
 lwip_poll_dec_sockets_used(fds, nfds);
 set_errno(0);
 return nready;
}

 

和lwip_select一样也是对事件进行扫描,只是扫描函数是lwip_pollscan而已。后面的内容就不在分析,有兴趣请参看LWIP源码。

lwip_poll函数实现的具体流程如下:

select

 

 

5并发服务器实现

前文讲解了select/poll机制在LWIP的实现,接下来将使用select/poll来实现并发服务器。这里以select为例。

select并发服务器模型:

socket(...); // 创建套接字
bind(...);   // 绑定
listen(...); // 监听
 
while(1)
{
    if(select(...) > 0) // 检测监听套接字是否可读
    {
        if(FD_ISSET(...)>0) // 套接字可读,证明有新客户端连接服务器 
        {
            accpet(...);// 取出已经完成的连接
            process(...);// 处理请求,反馈结果
        }
    }
    close(...); // 关闭连接套接字:accept()返回的套接字
}

 

因此,基于select实现的并发服务器模型如下:

select

从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

 

Server:

/**
 ******************************************************************************
 * @file                server.c
 * @author              BruceOu
 * @rtt version         V4.0.3
 * @version             V1.0
 * @date                2022-06-08
 * @blog               https://blog.bruceou.cn/
 * @Official Accounts   嵌入式实验楼
 * @brief               基于select的服务器
 ******************************************************************************
 */
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVER_PORT   8888
#define BUFF_SIZE 1024

static char recvbuff[BUFF_SIZE];

static void net_server_thread_entry(void *parameter)
{
    int sfd, cfd, maxfd, i, nready, n;

    struct sockaddr_in server_addr, client_addr;

    struct netdev *netdev = RT_NULL;

    char sendbuff[] = "Hello client!";

    socklen_t client_addr_len;
    fd_set all_set, read_set;

    //FD_SETSIZE里面包含了服务器的fd
    int clientfds[FD_SETSIZE - 1];

    /* 通过名称获取 netdev 网卡对象 */
    netdev = netdev_get_by_name((char*)parameter);
    if (netdev == RT_NULL)
    {
        rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);
    }

    //创建socket
    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        rt_kprintf("Socket create failed.\n");
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    //server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    /* 获取网卡对象中 IP 地址信息 */
    server_addr.sin_addr.s_addr = netdev->ip_addr.addr;

    //绑定socket
    if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket bind failed.\n");
        closesocket(sfd);
    }
    rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);

    //监听socket
    if(listen(sfd, 5) == -1)
    {
        rt_kprintf("listen error");
    }
    else
    {
        rt_kprintf("listening...\n");
    }

    client_addr_len = sizeof(client_addr);

    //初始化 maxfd 等于 sfd
    maxfd = sfd;

    //清空fdset
    FD_ZERO(&all_set);

    //把sfd文件描述符添加到集合中
    FD_SET(sfd, &all_set);

    //初始化客户端fd的集合
    for(i = 0; i < FD_SETSIZE -1 ; i++)
    {
        //初始化为-1
        clientfds[i] = -1;
    }
    while(1)
    {
        //每次select返回之后,fd_set集合就会变化,再select时,就不能使用,
        //所以我们要保存设置fd_set 和 读取的fd_set
        read_set = all_set;
        nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);

        //没有超时机制,不会返回0
        if(nready < 0)
        {
           rt_kprintf("select error \r\n");

        }

        //判断监听的套接字是否有数据
        if(FD_ISSET(sfd, &read_set))
        {
            //有客户端进行连接了
            cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
            if(cfd < 0)
            {
                rt_kprintf("accept socket error\r\n");
                //继续select
                continue;
            }
            rt_kprintf("new client connect fd = %d\r\n", cfd);

            //把新的cfd 添加到fd_set集合中
            FD_SET(cfd, &all_set);

            //更新要select的maxfd
            maxfd = (cfd > maxfd)?cfd:maxfd;

            //把新的cfd 保存到cfds集合中
            for(i = 0; i < FD_SETSIZE -1 ; i++)
            {
                if(clientfds[i] == -1)
                {
                    clientfds[i] = cfd;
                    //退出,不需要添加
                    break;
                }
            }

            //没有其他套接字需要处理:这里防止重复工作,就不去执行其他任务
            if(--nready == 0)
            {
                //继续select
                continue;
            }
        }

        //遍历所有的客户端文件描述符
        for(i = 0; i < FD_SETSIZE -1 ; i++)
        {
            if(clientfds[i] == -1)
            {
                //继续遍历
                continue;
            }

            //判断是否在fd_set集合里面
            if(FD_ISSET(clientfds[i], &read_set))
            {
                n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);
                rt_kprintf("clientfd %d:  %s \r\n",clientfds[i], recvbuff);

                if(n <= 0)
                {
                    //从集合里面清除
                    FD_CLR(clientfds[i], &all_set);
                    //当前的客户端fd 赋值为-1
                    clientfds[i] = -1;                }
                else
                {
                    //写回客户端
                    n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);
                    if(n < 0)
                    {
                        //从集合里面清除
                        FD_CLR(clientfds[i], &all_set);

                        //当前的客户端fd 赋值为-1
                        clientfds[i] = -1;
                    }
                }
            }
        }
    }
}

static int server(int argc, char **argv)
{
    rt_err_t ret = RT_EOK;

    if (argc != 2)
    {
        rt_kprintf("bind_test [netdev_name]  --bind network interface device by name.\n");
        return -RT_ERROR;
    }

    /* 创建 serial 线程 */
    rt_thread_t thread = rt_thread_create("server",
                                         net_server_thread_entry,
                                         argv[1],
                                          4096,
                                          10,
                                          10);

    /* 创建成功则启动线程 */
    if (thread != RT_NULL)
    {
        rt_thread_startup(thread);
    }
    else
    {
        ret = RT_ERROR;
    }

    return ret;
}


#ifdef FINSH_USING_MSH
#include 
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */

 

Client:【Linux版】

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVPORT 8888
 
int main(int argc,char *argv[]) 
{
    char sendbuf[] = "Client1 : Hello Rtthread!";
    char recvbuf[2014];

    int sockfd,sendbytes;
    struct sockaddr_in serv_addr;//需要连接的服务器地址信息

    if (argc != 2)
    {
       perror("init error");
    }

    //1.创建socket
    //AF_INET 表示IPV4
    //SOCK_STREAM 表示TCP
    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) 
    {
        perror("socket");
        exit(1);
    }

    //填充服务器地址信息
    serv_addr.sin_family       = AF_INET; //网络层的IP协议: IPV4
    serv_addr.sin_port          = htons(SERVPORT); //传输层的端口号
    serv_addr.sin_addr.s_addr   = inet_addr(argv[1]); //网络层的IP地址: 实际的服务器IP地址
    bzero(&(serv_addr.sin_zero),8); //保留的8字节置零

    //2.发起对服务器的连接信息
    //三次握手,需要将sockaddr_in类型的数据结构强制转换为sockaddr
    if((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {
        perror("connect failed!");
        exit(1);
    }

    printf("connect successful! \n");

    //3.发送消息给服务器端
    while (1)
    {
        send(sockfd, sendbuf, strlen(sendbuf), 0);

        recv(sockfd, recvbuf, sizeof(recvbuf), 0);

       printf("Server : %s \n", recvbuf);

       sleep(2);
    }

    //4.关闭
    close(sockfd);

}

 

Client:【RT-Thread版】

/**
 ******************************************************************************
 * @file                client.c
 * @author              BruceOu
 * @rtt version         V4.0.3
 * @version             V1.0
 * @date                2022-06-01
 * @blog               https://blog.bruceou.cn/
 * @Official Accounts   嵌入式实验楼
 * @brief               客户端
 ******************************************************************************
 */
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVER_HOST   "192.168.101.8"
#define SERVER_PORT   8888

static int client(int argc, char **argv)
{
    struct sockaddr_in client_addr;
    struct sockaddr_in server_addr;
    struct netdev *netdev = RT_NULL;
    int sockfd = -1;

    char sendbuf[] = "Hello RT-Thread! \r\n";
    char recvbuf[2014];

    if (argc != 2)
    {
        rt_kprintf("bind_test [netdev_name]  --bind network interface device by name.\n");
        return -RT_ERROR;
    }

    /* 通过名称获取 netdev 网卡对象 */
    netdev = netdev_get_by_name(argv[1]);
    if (netdev == RT_NULL)
    {
        rt_kprintf("get network interface device(%s) failed.\n", argv[1]);
        return -RT_ERROR;
    }

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        rt_kprintf("Socket create failed.\n");
        return -RT_ERROR;
    }

    /* 初始化需要绑定的客户端地址 */
    client_addr.sin_family = AF_INET;
    client_addr.sin_port = htons(8080);
    /* 获取网卡对象中 IP 地址信息 */
    client_addr.sin_addr.s_addr = netdev->ip_addr.addr;
    rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));

    if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket bind failed.\n");
        closesocket(sockfd);
        return -RT_ERROR;
    }
    rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);

    /* 初始化预连接的服务端地址 */
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
    rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));

    /* 连接到服务端 */
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket connect failed!\n");
        closesocket(sockfd);
        return -RT_ERROR;
    }
    else
    {
        rt_kprintf("socket connect success!\n");
    }

    while (1)
    {
        send(sockfd, sendbuf, strlen(sendbuf), 0);

        recv(sockfd, recvbuf, sizeof(recvbuf), 0);

        fputs(recvbuf, stdout);

        memset(recvbuf, 0, sizeof(recvbuf));

        rt_thread_mdelay(500);
    }

    /* 关闭连接 */
    closesocket(sockfd);
    return RT_EOK;
}

#ifdef FINSH_USING_MSH
#include 
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */

接下来就是验证了,关于ART-Pi的联网部分就不再赘述了有不懂的看前面的章节。

现在ART-Pi上开启服务器:

Server:

 

select

然后开启客户端,笔者的客户端在Ubuntu上运行的:

Client:

select

笔者这里使用的客户端只有两个,有兴趣的也可以使用多个客户端。

当然啦,如果懒得写客户端,也可使用网络调试助手测试。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分