Code前端首页关于Code前端联系我们

C++异步框架WorkFlow,为什么推荐这个开源项目?

terry 2年前 (2023-09-24) 阅读数 62 #后端开发

该框架有何特点?

用户体验相当好:界面简洁,支持常用协议,简单易用。下面我将介绍如何轻松做到;

性能好:不只是网络、磁盘IO、CPU计算等,而是工作流程焦点 由于尽可能调动了所有异步资源,所以有相当充足的测试数据来证明这个框架的性能优于当今主流服务器框架;

稳定性高:搜狗等多家公司都在用。这台发动机一定非常稳定。数据大家也可以自己查,我就不贴了;

支持多平台:该项目支持Linux、macOS、Windows、Android等操作系统。

解放用户生产力:用户只接触到两个概念:任务和任务流(系列)。框架高度封装资源,用户不需要暴露线程池、连接池、文件IO以及各种异步通知机制等,用户不需要关心内部细节,可以花更多的精力去实现复杂的业务逻辑。

全新设计理念:源码值得学习。我在阅读了这个项目的源码后推荐给大家。看完才知道原来代码可以这样写,继承也可以这样玩。

框架可以做什么?

框架可以做很多事情。我只是想介绍一些我个人认为重要的功能。有几个功能必须自己解锁。

轻松搭建服务器:不用说,如果服务器端框架搭建不了服务器,那还有什么办法,但是使用这个框架非常方便。以http服务器为例,只需要几行代码:

#include <stdio.h>
#include "workflow/WFHttpServer.h"

int main() {
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("Hello World!");
    });
    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }
    return 0;
}

轻松高效地发起客户端请求:该项目号称是通用异步客户端,目前支持http、redis、mysql、websocket等卡夫卡-协议。下面是官方的mysql客户端示例:

int main(int argc, char *argv[]) {
    ...
    WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
    task->get_req()->set_query("SHOW TABLES;");
    ...
    task->start();
    ...
}以往的C++ server需要访问mysql时,可能使用的是传统的客户端。在一个线程下以同步阻塞的方式等待数据到来。如果有多个网络请求希望并发,那么用户需要管理好多个mysql cli对象。

工作流完美解决了这一系列问题,将所有此类用户请求转移到内部poller线程进行统一管理,实现高效的非阻塞IO行为,提高服务器在请求时的性能关于作为客户的数据。您不必再担心此客户端行为会影响服务器的整体性能。

再举个例子,如果想完成wget的功能,只需要在WFHttpTask的回调函数中解析http返回的消息文本即可:

int main(int argc, char *argv[]) {
    WFHttpTask *task; 
    std::string url = argv[1];
    url = "http://" + url;
    task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
                                           wget_callback);
    protocol::HttpRequest *req = task->get_req();
    req->add_header_pair("Accept", "*/*");
    req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
    req->add_header_pair("Connection", "close");
    task->start();
    wait_group.wait();
    return 0;
}

wget首先发起一个http请求,并在中处理http响应wget_callback 消息正文:

void wget_callback(WFHttpTask *task) {
    protocol::HttpRequest *req = task->get_req();
    protocol::HttpResponse *resp = task->get_resp();
    int state = task->get_state();
    int error = task->get_error();

    std::string name;
    std::string value;
    protocol::HttpHeaderCursor req_cursor(req);
    while (req_cursor.next(name, value))
        fprintf(stderr, "%s: %srn", name.c_str(), value.c_str());
    fprintf(stderr, "rn");

    /* Print response header. */
    fprintf(stderr, "%s %s %srn", resp->get_http_version(),
                                    resp->get_status_code(),
                                    resp->get_reason_phrase());
    protocol::HttpHeaderCursor resp_cursor(resp);
    while (resp_cursor.next(name, value))
        fprintf(stderr, "%s: %srn", name.c_str(), value.c_str());
    fprintf(stderr, "rn");
    /* Print response body. */
    const void *body;
    size_t body_len;
    resp->get_parsed_body(&body, &body_len);
    fwrite(body, 1, body_len, stdout);
    fflush(stdout);
    fprintf(stderr, "nSuccess. Press Ctrl-C to exit.n");
}

Wget 功能可以通过工作流程轻松完成。

支持自定义协议客户端/服务器:用户可以构建自己的RPC系统。搜狗有一个开源项目srpc就是基于这个框架实现的。

可构建异步任务流:支持串行连接,支持并行连接,支持串并行组合,还支持复杂的DAG结构。

异步IO:可以作为Linux系统下的文件异步IO工具,性能超越所有标准调用。

通信与计算融合:大多数框架注重网络IO的效率,而计算和任务调度则必须由用户自己实现。该工作流会自动调度任务并开放网络和磁盘资源,特别适合需要网络通信的重型计算模块。

为什么推荐这个项目?

最主要的是:值得学习,适合C++开发者进阶。那么你实际上应该学习什么?

学习系统设计,所谓初级重在实现,中级重在炫技,高级重在设计。

在作者的设计理念中,所有的业务逻辑都是一个任务。多个任务将形成一个任务流。任务流程可以形成图表。该图可以是串联图、并联图或串并联图。类似这样: C++异步框架WorkFlow,为什么要推荐这个开源项目?

也可以是像这样的复杂DAG图: C++异步框架WorkFlow,为什么要推荐这个开源项目?

当然,图的层次结构可以由用户自定义。我个人认为该框架最神奇的地方在于它支持动态创建任务流。

使用以下代码,您可以以非常直观和友好的方式构建图的结构。好奇这段代码是如何实现的吗?

WFGraphNode a, b, c, d;
a-->b;
a-->c;
b-->d;
c-->d;

我会在这里尝试一下。如果你有兴趣的话,你可以自己去看看。总是贴别人的代码是不好的。

我觉得这个项目最值得大家了解的就是架构的设计,特殊任务的设计以及任务流程。我还没有读过代码,所以无法画出架构的设计图(我怕我写错了),所以只能画一个大概的轮廓。我不得不说这太棒了,因为它真的让我感到惊讶。

贴一张任务工作流程架构图:C++异步框架WorkFlow,为什么要推荐这个开源项目?

然后贴一张任务执行时序代码给大家看看:

项目中的所有任务都是通过工厂创建的:

static WFTimerTask *create_timer_task(const std::string& timer_name,
                                          unsigned int microseconds,
                                          timer_callback_t callback);

看一下设计看一下WFTimerTask:

class WFTimerTask : public SleepRequest {
public:
    void start() {
        assert(!series_of(this));
        Workflow::start_series_work(this, nullptr);
    }
    void dismiss() {
        assert(!series_of(this));
        delete this;
    }
protected:
    virtual SubTask *done() {
        if (this->callback)
            this->callback(this);
    }
};

看Workflow::start_series_work()方法:

inline void Workflow::start_series_work(SubTask *first, series_callback_t callback) {
    new SeriesWork(first, std::move(callback));
    first->dispatch();
}

然后SleepRequest:

class SleepRequest : public SubTask, public SleepSession {
public:
    virtual void dispatch() {
        if (this->scheduler->sleep(this) < 0) {
            this->state = SS_STATE_ERROR;
            this->error = errno;
            this->subtask_done();
        }
    }
protected:
    CommScheduler *scheduler;
    virtual void handle(int state, int error) {
        this->state = state;
        this->error = error;
        this->subtask_done();
    }
};

看调度器中的sleep()方法:

subtask_done()方法的执行:

void SubTask::subtask_done() {
    SubTask *cur = this;
    ParallelTask *parent;
    SubTask **entry;
    while (1) {
        parent = cur->parent;
        entry = cur->entry;
        cur = cur->done();
        xxx
        break;
    }
}

然后是SleepSession:

class SleepSession {
private:
    virtual int duration(struct timespec *value) = 0;
    virtual void handle(int state, int error) = 0;
public:
    virtual ~SleepSession() { }
    friend class Communicator;
};

看了这么多源码,WFTimerTask是如何实现定时器功能的呢?

我总结了以下步骤:

● 步骤一

用户调用WFTimerTask的start();

● 第二步

start() 调用 Workflow::start_series_work() 方法;

● 第三步

start_series_work()调用SubTask的dispatch()方法。这个dispatch()方法是由SubTask子类SleepRequest(WFTimerTask的父类)实现的;

● 步骤4

SleepRequest类的Dispatch()方法会调用scheduler->sleep()方法;

● 步骤5

sleep()方法调用SleepSession的duration()方法来获取具体的睡眠时间。框架内部使用timerfd将超时时间传递给操作系统。时间到了,会通知框架层,然后触发SleepSession中的handle()调用;

● 第六步,在

handle()的实现中会调用subtask_done()方法;

● 第七步,

subtask_done()会调用SubTask()方法中的done;

● 步骤8

这个done()方法是WFTimerTask专门覆盖的,实现中会调用一定时间后触发的回调函数。

乍一看似乎很麻烦。为什么需要这么多的继承关系来实现一个通用的时间函数呢?但当你真正看源码后,你会发现项目抽象出来的所有任务,比如计数器任务、文件IOTasks、网络任务、MySQLTask等都是以 SubTask、XXXRequest 和 XXXSession 的形式实现的。稍后,XXXTask可以很容易地扩展。这才是一个优秀的项目应该具备的结构。我真的很佩服这一点。

读者们,你能设计出这么先进的架构吗?不管怎样,我会把这个项目完成并推荐给大家一起学习。

最后总结一下我个人认为这个项目值得学习的地方:

小总结

界面设计:项目的做法极其简单。您只需几行代码即可构建客户端或服务器。几行代码也可以构建一个简单的任务流程图,可以用来处理复杂的业务逻辑;

架构设计:项目中的各个类是如何衍生出来的,作者的设计思想是什么? ;

网络通信:该项目不使用任何网络框架,而是使用裸网络接口进行网络通信。我们都知道,在大型项目中使用裸网络接口进行网络通信需要处理很多异常情况。这里值得学习;

任务流程封装:为什么可以动态构建任务流程串并图,并在项目内灵活调度?

文件I/O:该项目声称内部文件I/O操作比标准调用具有更好的性能。它是如何做到的?

内存管理:项目没有使用智能指针,但是可以很好的处理内存问题。这是一项技术活,当然也得益于这种优秀的架构设计。

Github地址https://github.com/sogou/workflow

Gitee地址:

版权声明

本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

热门