workflow的任务模型

描述

今天,想聊聊workflow这个开源项目。

关于workflow,我之前特意写过一篇文章【推荐学习这个C++开源项目】。

今天还是想再啰嗦啰嗦,因为自己这一年也在带团队从0到1做项目,需要负责整个项目的架构设计、接口设计、模块划分等工作。

做了一段时间后再回过头复盘一下,深知架构设计、接口设计的重要性,也感受到了架构设计的困难程度,编码和设计相比,真的容易的多了。

然后自己又回头来研究了一下workflow,想着学习下其他项目的设计理念,随着自己研究的越来越深入,越来越感觉它的高端,对外暴露特别简单的接口却能完成非常复杂的功能。

上篇文章是基础篇,主要向大家普及一下workflow的特点和作用,感兴趣的朋友可以移步到那里哈。

本篇文章是进阶篇,主要就是想介绍下workflow的任务模型,其他的框架一般只能处理普通的网络通信,而workflow却特别适用于通信与计算关系很复杂的应用。其实我最感兴趣的是它的内存管理机制,下面也会详细介绍。

框架

框架

优秀的系统设计

框架

框架

在作者的设计理念中:程序 = 协议 + 算法 + 任务流。

**协议:**就是指通用的网络协议,比如http、redis等,当然还可以自定义网络协议,这里只需要提供序列化和反序列化函数就可以达到想要的效果。

算法: workflow提供了一些通用的算法,比如sort、merge、reduce等,当然还可以自定义算法,用过C++ STL的朋友应该都知道如何自定义算法吧,在workflow中,任何复杂的计算都应该包装成算法。

**任务流:**我认为这应该就是整个系统设计的核心,通过任务流来抽象封装实际的业务逻辑,就是把开发好的协议和算法组成一个任务流程图,然后调度执行这个图。

框架

框架

任务流

框架

框架

这里多聊聊任务流,在workflow中,一切业务逻辑皆是任务,多个任务会组成任务流,任务流可组成图,这个图可能是串联图,可能是并联图,也有可能是串并联图,类似于这种:

框架

也可能是这种复杂的DAG图:

框架

图的层次结构可以由用户自定义,框架也是支持动态地创建任务流。

引用作者的一句话:

框架

框架

如果把业务逻辑想象成用设计好的电子元件搭建电路,那么每个电子元件内部可能又是一个复杂电路。

workflow对任务做了很好的抽象和封装。整个系统包含6种基础任务:通讯、文件IO、CPU、定时器、计数器。

workflow提供了任务工厂,所有的任务都由任务工厂产生,并且会自动回收。

大多数情况下,通过任务工厂创建的任务都是一个复合任务,但用户并不感知。例如一个http请求,可能包含很多次异步过程(DNS,重定向),内部有很多复杂的任务,但对用户来讲,这就是一次简单的通信任务。

哪有什么岁月静好,只不过是有人替你负重前行。workflow的一大特点就是接口暴露的特别简洁,非常方便用户的接入,外部接入如此简单,肯定是将很多组合的逻辑都放在了内部,但其实workflow项目内部代码结构层次非常简洁清晰,感兴趣的朋友可以自己研究研究哈。

框架

框架

内存管理机制

框架

框架

还有就是项目的内存管理机制,workflow整个项目都遵循着谁申请谁释放的原则,内部申请的内存不需要外部显式释放,内部会自动回收内存。

而且整个项目都没有使用shared_ptr,那多个对象共同使用同一块裸内存,workflow是怎么处理的呢?

内部为这种需要共享的对象各自维护一个引用计数,手动incref和decref,至于为什么要这样做,可以看看workflow美女架构师的回答【https://www.zhihu.com/question/33084543/answer/2209929271】。

我总结了一下:

  • shared_ptr是非侵入式指针,一层包一层,而且为了保持shared_ptr覆盖对象整个生命周期,每次传递时都必须带着智能指针模板,使用具有传染性,而且也比较麻烦。
  • shared_ptr引用计数的内存区域和数据区域不一致,不连续,缓存失效可能导致性能问题,尽管有make_shared,但还是容易用错。
  • 手动为对象做incref和decref,使用起来更灵活,可以明确在引用计数为固定数字时做一些自定义操作,而且方便调试。因为手动管理对象的引用计数,就会要求开发者明晰对象的生命周期,明确什么时候该使用对象,什么时候该释放对象。
  • 如果使用shared_ptr可能会激起开发者的惰性,反正也不需要管理内存啦,就无脑使用shared_ptr呗,最后出现问题时调试起来也比较困难。

那再深入源码中研究研究,看看workflow是如何做到把对象指针给到外部后,内部还自动回收的。

拿WFClientTask举例说明一下,workflow中所有的Task都是通过Factory创建:

static WFHttpTask *create_http_task(const std::string& url,
                    int redirect_max,
                    int retry_max,
                    http_callback_t callback);


using WFHttpTask = WFNetworkTask;

template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask {};

注意,create参数里有一个callback,workflow一定会执行这个callback,然后内部回收掉该WFClientTask占用的内存,任何任务的生命周期都是从创建到callback函数结束。

它是怎么做到的?继续看下WFClientTask的继承层次结构:

template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask {};


template<class REQ, class RESP>
class WFNetworkTask : public CommRequest {};


class CommRequest : public SubTask, public CommSession {};


class SubTask {
public:
  virtual void dispatch() = 0;
private:
  virtual SubTask *done() = 0;
protected:
  void subtask_done();
};

WFClientTask继承于WFNetworkTask,WFNetworkTask又继承于SubTask。

SubTask内部有subtask_done()方法,看下它的实现:

void SubTask::subtask_done() {
  SubTask *cur = this;
  ParallelTask *parent;
  SubTask **entry;


  while (1) {
    parent = cur->parent;
    entry = cur->entry;
    cur = cur->done();
    if (cur) {
      cur->parent = parent;
      cur->entry = entry;
      if (parent)
        *entry = cur;
      cur->dispatch();
    }
    else if (parent) {
      if (__sync_sub_and_fetch(&parent->nleft, 1) == 0) {
        cur = parent;
        continue;
      }
    }
    break;
  }
}

subtask_done()方法中会调用它的done()方法,然而这几个方法都是virtual方法,看看WFClientTask是怎么重写它们的:

template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask<REQ, RESP> {
protected:
  virtual SubTask *done() {
    SeriesWork *series = series_of(this);
    if (this->state == WFT_STATE_SYS_ERROR && this->error < 0) {
      this->state = WFT_STATE_SSL_ERROR;
      this->error = -this->error;
    }
    if (this->callback)
      this->callback(this);
    delete this;
    return series->pop();
  }
};

子类重写了done()方法,可以看到在它的实现里,触发了callback,然后调用了delete this,释放掉了当前占用的这块内存。

那谁调用的done(),可以看下上面的代码,subtask_done()会触发done(),那谁触发的subtask_done():

void CommRequest::dispatch() {
  if (this->scheduler->request(this, this->object, this->wait_timeout,
                 &this->target) < 0) {
    this->state = CS_STATE_ERROR;
    this->error = errno;
    if (errno != ETIMEDOUT)
      this->timeout_reason = TOR_NOT_TIMEOUT;
    else
      this->timeout_reason = TOR_WAIT_TIMEOUT;
    this->subtask_done();
  }
}

可以看到,dispatch()里触发了subtask_done(),那谁触发的dispatch():

template<class REQ, class RESP>
class WFNetworkTask : public CommRequest {
public:
  /* start(), dismiss() are for client tasks only. */
  void start() {
    assert(!series_of(this));
    Workflow::start_series_work(this, nullptr);
  }
};


inline void
Workflow::start_series_work(SubTask *first, SubTask *last,
              series_callback_t callback) {
  SeriesWork *series = new SeriesWork(first, std::move(callback));
  series->set_last_task(last);
  first->dispatch();
}

这里可以看到,Task的start()方法触发start_series_work(),进而触发dispatch()方法。

总结一下:

● 步骤一

通过工厂方法创建WFClientTask,同时设置callback;

● 步骤二

外部调用start()方法,start()中调用Workflow::start_series_work()方法;

● 步骤三

start_series_work()中调用SubTask的dispatch()方法,这个dispatch()方法由SubTask的子类CommRequest(WFClientTask的父类)实现;

● 步骤四

dispatch()方法在异步操作结束后会触发subtask_done()方法;

● 步骤五

subtask_done()方法内会触发done()方法;

● 步骤六

done()方法内会触发callback,然后delete this;

● 步骤七

内存释放完成。

其实这块可以猜到,想要销毁自己的内存,基本上也就delete this这个方法。

然而我认为这中间调用的思想和过程才是我们需要重点关注和学习的,远不止我上面描述的这么简单,感兴趣的朋友可以自己去研究研究哈。

关于workflow还有很多优点,这里就不一一列举啦,它的源码也值得我们CPP开发者学习和进阶,具体可以看我之前的文章。

发现workflow团队对这个项目相当重视,还特意建了个QQ交流群(群号码是618773193),对此项目有任何问题都可以在这个群里探讨,也方便了我们学习,真的不错。项目地址如下:https://github.com/sogou/workflow,也可以点击阅读原文直达。

在访问GitHub遇到困难时,可使用他们的Gitee官方仓库:https://gitee.com/sogou/workflow。

项目也特别实用,据说搜狗内外很多团队都在使用workflow。感觉这个项目值得学习的话就给人家个star,不要白嫖哈,对项目团队来说也是一种认可和鼓励。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分