Archive

Archive for March, 2012

lthread代码分析(四)

March 20th, 2012 No comments

 我们知道,一个调用阻塞socket的recv操作会一直等到有数据到达以后才会返回(采用了MSG_DONTWAIT标志的不算)。如果通过异步socket来实现,那么需要使用select或者epoll,然后通过类似回调的方式来告知有数据到达。有了lthread以后呢,这个步骤就可以由lthread来实现,而用户代码只需要像以前使用阻塞socket那样直接调用recv(这里要替换成lthread的recv函数了)就可以了。在代码逻辑上会比直接使用select/epoll来的简洁得多。我们来看看lthread里面关于socket部分的实现。

#define LTHREAD_RECV(x, y)                                  
x {                                                         
    ssize_t ret = 0;                                        
    lthread_t *lt = lthread_get_sched()->current_lthread;   
    while (1) {                                             
        if (lt->state & bit(LT_FDEOF))                      
            return -1;                                      
        ret = y;                                            
        if (ret == -1 && errno != EAGAIN)                   
            return -1;                                      
        if ((ret == -1 && errno == EAGAIN)) {               
            if (timeout)                                    
                _sched_lthread(lt, timeout);                
            _lthread_wait_for(lt, fd, LT_READ);             
            if (lt->state & bit(LT_EXPIRED))                
                return -2;                                  
        }                                                   
        if (ret >= 0)                                       
            return ret;                                     
    }                                                       
}

上面的宏定义里面,我们可以看出当一个接收数据操作(可能是recv也可能是recvfrom)被调用以后,如果遇到返回值为EAGAIN,那么就会调用_sched_lthread等待一个timeout时间。在等待的过程中如果该相关的fd上有可读事件发生,那么相关的_lthread就会被调用到。如果超时以后还没有事件到达,这个recv操作就会返回-2.

#define LTHREAD_SEND(x, y)                                  
x {                                                         
    ssize_t ret = 0;                                        
    ssize_t sent = 0;                                       
    lthread_t *lt = lthread_get_sched()->current_lthread;   
    while (sent != length) {                                
        if (lt->state & bit(LT_FDEOF))                      
            return -1;                                      
        ret = y;                                            
        if (ret == 0)                                       
            return sent;                                    
        if (ret > 0)                                        
            sent += ret;                                    
        if (ret == -1 && errno != EAGAIN)                   
            return -1;                                      
        if (ret == -1 && errno == EAGAIN)                   
            _lthread_wait_for(lt, fd, LT_WRITE);            
    }                                                       
    return sent;                                            
}                                                           

同理,发送数据也是一样的,当系统缓冲区能够容纳数据的时候,返回成功。否则等待相关fd的可写事件发生以后再向其缓冲区写入数据。

当然,上面的发送和接收表明,相关的socket必须是非阻塞的,否则无法达到这些效果。下面的lthread_socket函数就说明了这个问题

int
lthread_socket(int domain, int type, int protocol)
{
    int fd;
#if defined(__FreeBSD__) || defined(__APPLE__)
    int set = 1;
#endif

    if ((fd = socket(domain, type, protocol)) == -1) {
        perror("Failed to create a new socket");
        return -1;
    }
    /*把socket设置被非阻塞的*/
    if ((fcntl(fd, F_SETFL, O_NONBLOCK)) == -1) {
        close(fd);
        perror("Failed to set socket properties");
        return -1;
    }

#if defined(__FreeBSD__) || defined(__APPLE__)
    if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(int)) == -1) {
        close(fd);
        perror("Failed to set socket properties");
        return -1;
    }
#endif

    return fd;
}

当然其他的还有accept和connect之类的操作基本上也是差不多的,只要遇到EAGAIN就会调用lthread调度器切换到其他的lthread上,同时监听fd感性却的事件。
有了这个lthread和socket的绑定,撰写concurrent的socket程序应该会简单很多的

Categories: programming Tags: ,

lthread代码分析(三)

March 16th, 2012 No comments

前面我们查看了lthread的创建以及调度器的运行代码,那么在不同的lthread中到底是如何切换的呢,我们现在来看一下lthread的切换代码lthread_resume函数。

int
_lthread_resume(lthread_t *lt)
{
    if (lt->state & bit(LT_NEW))/*如果是用lthread_create新创建的lthread,那么就初始化一下一些必要的数据*/
        _lthread_init(lt);

    _restore_exec_state(lt);/*这里是从lt中回复出当前的一些必要的执行环境数据,主要的是栈空间数据*/

    lthread_get_sched()->current_lthread = lt; /*设置调度器的当前运行lthread为lt*/
    _switch(&lt->st, &lt->sched->st);/*切换cpu执行代码位置,这里已经开始执行lt的代码了*/
    lthread_get_sched()->current_lthread = NULL;/*lt的代码执行完毕,或者是lt的执行函数中主动让出了执行权*/

    if (lt->state & bit(LT_EXITED)) {/*lthread执行完毕,做一些清理工作*/
        if (lt->lt_join) {
            /* if lthread was sleeping, deschedule it so that it doesn't expire. */
            _desched_lthread(lt->lt_join);
            LIST_INSERT_HEAD(&lthread_get_sched()->new, lt->lt_join, new_next);
            lt->lt_join = NULL;
        }

        /* if lthread is detached, free it, otherwise lthread_join() will */
        if (lt->state & bit(LT_DETACH))
            _lthread_free(lt);
        return -1;
    } else {
        _save_exec_state(lt);/*lt只会暂时交出了cpu的控制权,所以需要保存当前的运行环境,主要是栈空间数据*/
        /* place it in a compute scheduler if needed.  */
        if (lt->state & bit(LT_PENDING_RUNCOMPUTE)) {
            _lthread_compute_add(lt);
            lthread_get_sched()->sleeping_state++;
        }
    }

    return 0;
}

以上的部分就是如何运行一个lthread的代码,那么里面的函数saveexecstate和restoreexecstate分别是用来保存和回复运行环境的栈空间数据的。

inline int
_restore_exec_state(lthread_t *lt)
{
    if (lt->stack_size)
        memcpy(lt->st.esp, lt->stack, lt->stack_size);/*把保存下来的占空间数据copy到lt对应的sched的申请的栈空间*/

    return 0;
}

int
_save_exec_state(lthread_t *lt)
{
    void *stack_top = NULL;
    size_t size = 0;
    /*计算当前使用了多少占空间*/
    stack_top = lt->sched->stack + lt->sched->stack_size;
    size = stack_top - lt->st.esp;

    if (size && lt->stack_size != size) {
        if (lt->stack)
            free(lt->stack);
        if ((lt->stack = calloc(1, size)) == NULL) {
            perror("Failed to allocate memory to save stackn");
            abort();
        }
    }

    lt->stack_size = size;
    if (size)
        memcpy(lt->stack, lt->st.esp, size);/*把当前使用的栈上的数据保存下来,这样resume lthread的时候就可以使用了*/

    return 0;
}

这里我们可以看出,在lthread切换的时候的栈上的数据是直接保存在一个申请的额外空间里面的。最开始我也认为这样非常不好。但是如果不这样的话,就需要为每一个lthread申请一个很大的内存区域用作栈空间,这个明显是不现实的,特别是lthread的实例比较多的情况下。当然,现在这种处理方式也有一个问题。那就是lthread在执行的过程中使用了很多stack上的内存,例如char tmpBuffer[1024*1024]。这样会导致每次lthread切换的时候会有很多数据被copy过来,copy过去。如果在lthread的执行过程中需要一个大内存数据块,最好在堆上面分配内存,而不要使用栈上的内存。

Categories: programming Tags: ,

lthread代码分析(二)

March 16th, 2012 No comments

上回介绍了一些lthread的重要的数据结构,还有lthread是如何切换cpu执行指令的位置来达到任务切换的,现在来看看lthread的一些基本操作。不过说了这么多lthread到底是一个什么东西呢? 简单的说,lthread是一个用户态的轻量级线程,并且是不可比强制切换的哪一种。只有在lthread的执行过程中主动调用了一些函数,例如lthread_sleeping之类的函数,才会由sched来进行任务切换。也就是说lthread是一个non-preempty thread。

上回已经说过了一个lthread的数据结构是_lthread,当我们需要使用lthread的时候,首先需要创建一个lthread的实例 _lthread_create就是创建这个lthread实例的,类似于pthread_create这种函数.

int
lthread_create(lthread_t **new_lt, void *fun, void *arg)
{
    lthread_t *lt = NULL;
    pthread_once(&key_once, _lthread_key_create);
    sched_t *sched = lthread_get_sched();
    /*
    new_lt 这个是被创建的lthread的指针,成功以后就可以使用*new_lt进行lthread操作
    fun    和pthread的意思一样,就是lthread的执行函数
    arg    和pthread的意思一样,这个值将会传入你的提供的lthread执行函数
    */
    if (sched == NULL) {
        sched_create(0);/*创建一个调度器,这个就会在以后有所说明*/
        sched = lthread_get_sched();
        if (sched == NULL) {
            perror("Failed to create scheduler");
            return -1;
        }
    }

    if ((lt = calloc(1, sizeof(lthread_t))) == NULL) {
        perror("Failed to allocate memory for new lthread");
        return errno;
    }
    /*
    如果没有调度器话那么就创建一个,一个系统线程只会有一个调度器,所有在这个系统线程上运行的lthread都由这个调度器进行调度操作
    */
    lt->sched = sched;
    lt->stack_size = 0;
    lt->state = bit(LT_NEW);
    lt->id = sched->total_lthreads++;
    lt->fun = fun;
    lt->fd_wait = -1;
    lt->arg = arg;
    lt->birth = rdtsc();
    lt->timeout = -1;
    *new_lt = lt;
    LIST_INSERT_HEAD(&lt->sched->new, lt, new_next);
     /*
     为lthread分配内存,并且初始化一些参数。
     将当前新建的lthread的调度器指针指向当前系统线程拥有的调度器上, 初始化stack_size为0,这个时候lthread并未执行,并且将状态置为new
     记录下线程创建时间。然后把新建的线程记录到调度器的new lthread list中去,以便调度器操作
    */
    return 0;
}

上面的lthreadcreate中调用了sched_create用于创建lthread的调度器,现在我们来看看调度器是如何创建的

int
sched_create(size_t stack_size)
{
    sched_t *new_sched;
    size_t sched_stack_size = 0;
    /*选择一个栈空间大小,不设置的话就用8M这个缺省值*/
    sched_stack_size = stack_size ? stack_size : MAX_STACK_SIZE;

    if ((new_sched = calloc(1, sizeof(sched_t))) == NULL) {
        perror("Failed to initialize schedulern");
        return errno;
    }
    /*将调度器设置到系统线程的TLS中,以便在当前的系统线程的任何一个地方都可以使用*/
    pthread_setspecific(lthread_sched_key, new_sched);
    /*为调度器分配栈空间大小,其实所有这个调度器运行的的lthread都会使用这个栈*/
    if ((new_sched->stack = calloc(1, sched_stack_size)) == NULL) {
        free(new_sched);
        perror("Failed to initialize schedulern");
        return errno;
    }

    bzero(new_sched->stack, sched_stack_size);/*不知道为何要执行这个操作,感觉没有什么必要*/
    /*
     创建一个poller用于获取网络IO的事件,以后估计会有File IO的事件。
    */
    if ((new_sched->poller = create_poller()) == -1) {
        perror("Failed to initialize pollern");
        _sched_free(new_sched);
        return errno;
    }

    if (pthread_mutex_init(&new_sched->compute_mutex, NULL) != 0) {
        perror("Failed to initialize compute_mutexn");
        _sched_free(new_sched);
        return errno;
    } 
    /*创建pipe,这样的好处是即便当前没有任何网络事件,也可以用这个pipe来唤醒调度器*/
    if (pipe(new_sched->compute_pipes) == -1) {
        perror("Failed to initialize pipen");
        _sched_free(new_sched);
        return errno;
    }

    new_sched->stack_size = sched_stack_size;

    new_sched->total_lthreads = 0;
    new_sched->default_timeout = 3000000u;
    new_sched->waiting_state = 0;
    new_sched->sleeping_state = 0;
    new_sched->sleeping = RB_ROOT;
    new_sched->birth = rdtsc();
    LIST_INIT(&new_sched->new);
    *将调度器的cpu state清空,调用lthread_run并且有lthread需要运行的话,这个st将会被设置*/
    bzero(&new_sched->st, sizeof(struct _cpu_state));

    return 0;
}

当lthread以及相关的sched的实例都创建好了,我们就可以使用lthread_run来执行了

void
lthread_run(void)
{
    sched_t *sched;
    lthread_t *lt = NULL, *lttmp = NULL;
    int p = 0;
    int fd = 0;
    int ret = 0;
    (void)ret; /* silence compiler */

    sched = lthread_get_sched();/*从系统线程中取得设置的调度器,每一个系统线程至多有一个调度器*/

    while (sched->sleeping_state ||
        !LIST_EMPTY(&sched->new) ||
        sched->waiting_state) {/*如果有lthread处于sleep状态,等待网络事件状态,或者有新建的lthread就一直执行,否则退出*/

        /*
         运行那些过期的lthread,例如某一个lthread设置10s以后再次被执行,那么如果现在已经过了10s,那么这个
         lthread就需要被执行。这个是一种设置timer的好方法
        */
        _resume_expired_lthreads(sched);

        /*运行那些被lthread_create创建的lthread实例*/
        while (!LIST_EMPTY(&sched->new)) {
            LIST_FOREACH_SAFE(lt, &sched->new, new_next, lttmp) {
                LIST_REMOVE(lt, new_next);
                _lthread_resume(lt);
            }
        }

        /* 3. resume lthreads we received from lthread_compute, if any */
        while (1) {
            pthread_mutex_lock(&sched->compute_mutex);
            lt = LIST_FIRST(&sched->compute);
            if (lt == NULL) {
                pthread_mutex_unlock(&sched->compute_mutex);
                break;
            }
            LIST_REMOVE(lt, compute_sched_next);
            pthread_mutex_unlock(&sched->compute_mutex);
            sched->sleeping_state--;
            _lthread_resume(lt);
        }
        /*向poller注册感兴趣的事件,这样当有事件发生的时候,就会得到通知*/
        register_rd_interest(sched->compute_pipes[0]);
        _lthread_poll();/*等待事件发生或者timeout*/

        /* 5. fire up lthreads that are ready to run */
        while (sched->total_new_events) {/*如果有网络事件发生就一一找到对应的lthread执行*/
            p = --sched->total_new_events;

            /* We got signaled via pipe to wakeup from polling & rusume compute.
             * Those lthreads will get handled in step 3.
             */
            fd = get_fd(&sched->eventlist[p]);
            if (fd == sched->compute_pipes[0]) {
                ret = read(fd, &tmp, sizeof(tmp));
                continue;
            }

            lt = (lthread_t *)get_data(&sched->eventlist[p]);
            if (lt == NULL)
                assert(0);

            if (is_eof(&sched->eventlist[p]))
                lt->state |= bit(LT_FDEOF);

            _desched_lthread(lt);
            _lthread_resume(lt);
        }
    }

    _sched_free(sched);

    return;
}

以上部分中,cpustate在执行过程中起到了关键的作用,当任务调度器发现有lthread需要执行的时候将会将CPU切换到需要执行Lthread的cpustate上,这个lthread执行完毕以后又会将CPU切换到调度器的cpustate上。于是调度器就继续执行剩下的事情,找寻下一个需要执行的lthread或者等待事件发生。lthread 运行示意图当没有任何需要执行的lthread存在以后调度器的运行代码就会退出(实际上是lthreadrun这个函数退出)。

Categories: programming Tags: ,

lthread代码分析(一)

March 11th, 2012 No comments

  我们的服务器程序基本上都是基于多线程的。理由很简单,需要同时处理很多请求。但是随之而来的问题也不少,有的是设计上的失误。,有的是代码上的欠缺。例如:有很多的RPC调用都是同步调用,这个就意味着调用者的当前的线程就会被一直阻塞到调用返回或者超时。一旦被调用端出现了什么问题,调用者的当前线程就不能去做其他的事情。一个最简单的解决办法就是增加线程数量,但是这并不是一个非常完美的解决方案。线程越多,系统在线程切换上面所消耗的时间也越多,而且一个系统上的线程数量也不能无休止的往上增长。

  有一天,我发现有一个python的package叫做gevent,就是在python利用coroutine实现网络框架,在网络方面主要应用了libevent(epoll),在coroutine方面使用了greenlet

  于是乎我自然想到了是否可以在c/c++上面应用这个特性呢?epoll是的有的,缺的就是一个coroutine类似的轻量级线程了。后来就在github上面发现了lthread这个东西,下面记录一下我看这个库代码的一些过程和所想所得。

int _switch(struct _cpu_state *new_state, struct _cpu_state *cur_state);
__asm__ (
"    .text                                  n"
"    .p2align 2,,3                          n"
".globl _switch                             n"
"_switch:                                   n"
"__switch:                                  n"
"movl 8(%esp), %edx      # fs->%edx         n"
"movl %esp, 0(%edx)      # save esp         n"
"movl %ebp, 4(%edx)      # save ebp         n"
"movl (%esp), %eax       # save eip         n"
"movl %eax, 8(%edx)                         n"
"movl %ebx, 12(%edx)     # save ebx,esi,edi n"
"movl %esi, 16(%edx)                        n"
"movl %edi, 20(%edx)                        n"
"movl 4(%esp), %edx      # ts->%edx         n"
"movl 20(%edx), %edi     # restore ebx,esi,edi      n"
"movl 16(%edx), %esi                                n"
"movl 12(%edx), %ebx                                n"
"movl 0(%edx), %esp      # restore esp              n"
"movl 4(%edx), %ebp      # restore ebp              n"
"movl 8(%edx), %eax      # restore eip              n"
"movl %eax, (%esp)                                  n"
"ret                                                n"
);

以上是这部分呢,就是lthread里面切换不同轻量级线程的代码了,代码本身就很好的说明了这个函数是干什么的。倒是第一次看到注释时候被这个fs->%edx  ts->%edx给弄晕了。其实前面的8 个movl就是把当前的寄存器的值保存到curstate指向的数据够中,后面的8个movl从newstate中恢复执行环境.  这个函数执行完毕以后,CPU就会执行new_state里面所指定的环境的内容。其中,执行环境主要包括 esp ebp eip(这个不能直接赋值) 。

这里罗列一下lthread里面我认为比较重要的两个数据结构_lthread和_sched

 

struct _lthread {
    struct              _cpu_state st;/*这个是当前的_lthread被切换以后保留下来的cpu执行环境,下次执行的时候需要*/
    void                (*fun)(lthread_t *lt, void *);/*_lthread的执行函数*/
    void                *arg;
    void                *data;
    size_t              stack_size;/*这个是用于lthread切换的时候指明栈空间使用的大小*/
    lt_state_t          state;
    sched_t             *sched;
    compute_sched_t     *compute_sched;
    uint64_t            timeout;
    uint64_t            ticks;
    uint64_t            birth;
    uint64_t            id;
    int                 fd_wait; /* fd we are waiting on */
    char                funcname[64];
    lthread_t           *lt_join;
    void                **lt_exit_ptr;
    LIST_ENTRY(_lthread)    new_next;
    LIST_ENTRY(_lthread)    sleep_next;
    LIST_ENTRY(_lthread)    compute_next;
    LIST_ENTRY(_lthread)    compute_sched_next;
    sched_node_t        *sched_node;
    lthread_l_t         *sleep_list;
    void                *stack;
    void                *ebp;
};
struct _sched {
    size_t              stack_size;/*调度器对应的栈大小,lthread运行的时候实际使用的是调度器所拥有的栈空间*/
    int                 total_lthreads;
    int                 waiting_state;
    int                 sleeping_state;
    int                 poller;
    int                 nevents;
    uint64_t            default_timeout;
    int                 total_new_events;
    /* lists to save an lthread depending on its state */
    lthread_l_t         new;
    lthread_l_t         compute;
    struct rb_root      sleeping;
    uint64_t            birth;
    void                *stack;
    lthread_t           *current_lthread;
    struct _cpu_state   st;/*调度器被切换以后的cpu执行环境,以便恢复以后继续进行调度工作*/
    struct epoll_event  eventlist[LT_MAX_EVENTS];
    int                 compute_pipes[2];
    pthread_mutex_t     compute_mutex;
};

[to be continue …]

Categories: programming Tags: ,

mint12的桌面总是crash

March 10th, 2012 No comments

不知道是我人品不好还是咋的,最近发现mint12的GUI总是crash,看.xsession-errors文件里面有如下提示:

gnome-session[1542]: WARNING: Application ‘gnome-shell.desktop’ killed by signal

也不知道是被啥信号给干掉的,真是郁闷,一旦gnome死掉以后,GUI操作就无法继续了。

google了一番发现这是一个已知的bug

还好有人提出一个简单的方法可以暂时让桌面回来:DISPLAY=:0.0 gnome-shell &

不过这个真的不是长久之计啊,我是不想用unity才用这个mint的啊。如果gnome老是这样crash的话我真的受不了啊

Categories: mutter Tags:

一个关于UDP数据发送错误的问题

March 8th, 2012 No comments

  为了能支持新项目的DMSCC和LSCP协议,决定增加UDP的功能。加上以前在系统上使用的是IOCP(win)/epoll(linux),所以新增加的UDP也尽量向采用这个机制,同时沿用以前的框架,那么外部对于socket server的使用基本保持一致。

  完成代码以后测试的时候发现了一个问题,就是经过一段时间的测试以后,UDP server就不在接收数据了。检查日志的时候发现原来是UDP server socket收到了一个报错,而我之前很蠢的认为如果udp server socket报错就代表这个socket不可再用了。后来才明白这个错误是因为server socket尝试发送一个udp数据包到client的时候因为client可以能已经关闭了socket,或者其他的原因造成数据包不可到达。这个时候系统会收到一个ICMP消息告知主机消息不可达。而这个时候IOCP就会告知说出现一个错误(在windows上错误码是1234)。当收到这个错误告知以后,再次调用WSARecvFrom即可继续接收新的udp消息。

udp 数据包不可达,收到一个ICMP消息

Categories: programming Tags: ,