Code前端首页关于Code前端联系我们

与Github的webhook实现类似,

terry 2年前 (2023-09-27) 阅读数 74 #数据结构与算法

Webhook是一个非常强大的推送机制。熟悉WordPress的同学可以比较构建WP生态系统的不同hook功能。Githubt允许开发人员通过Webhooks监控存储库中的更改,以触发持续集成工具(例如Travis CI)的操作。

要求

大家都在Github上看过webhook。您可以为特定存储库设置 Webhook 来监控存储库更改,例如推送、page_build 和其他事件(X-GitHub-Event)。

类似Github的webhook实现 每次提交都会有一个 uuid 作为标签,并写入 X-GitHub 传递的 HTTP 标头。要查看失败的提交历史记录,请单击“重新发送”重新发送。

类似Github的webhook实现

如果该功能作为单独的服务提供商使用,其基本要求是准确记录该服务与互联网之间的任何网络交互过程,包括请求数据和响应结果数据的发送。继续细化,

  1. 服务提供者接收客户端的调用,将请求发送到客户端指定的URL,并接收响应。
  2. 记录每个客户端的原始请求内容(url、method、header、body)和响应(header、body、code等)
  3. 必须考虑到客户端的重试或重复调用,需要记录每通话次数请求和最后一次调用的时间。 (当客户端调用时,可以发送一个客户端ID到接收端进行去重,如果没有,服务提供商会根据请求生成一个唯一的uuid)
  4. 提供接口重新发送指定的调度历史记录。

思考

有了需求,首先思考这个服务要和哪些系统交互?

  1. 请求必须发送到指定的URL,那么第一个交互的系统就是公网服务。
  2. 发送历史必须保留,表示数据必须保留。另一个与之交互的系统是数据库。

好,交互系统确定之后,接下来就要考虑顺序了。我们应该先将请求发送到公共网络服务还是先服务数据库?我们来一一分析

  • 方案A:先发送请求,再将记录写入数据库。问题:如果发送请求但数据库写入失败,会因为发送历史丢失而出现数据不一致的情况。
  • 选项B:先写入数据库,然后发送请求。问题:与A类似,如果数据库写入成功,由于网络中断等原因导致请求无法发送。此时数据也会不一致。虽然有发送历史记录,但发送本身失败。
  • 选项C:先写入数据库,然后发送请求,最后更新数据。这种方案相对A、B来说更加可靠。第一步,写入请求的数据,并将状态(status)设置为sending。发送完成后,状态会更新为成功失败
  • 选项D:先写入数据库,将状态设置为正在发送,启动新线程扫描表,发送状态为ready的记录。发送完成后,将状态更新为成功失败

前两种选择绝对不可取。让我们分析一下最后两个选项的优缺点。方案C的缺点是第一步写入数据库完成后,发送请求时系统宕机,记录会一直处于send状态。幸运的是,总体计划将提供手动重试机制(点击 Redeliver),可以弥补后者。优点是序列化的心理编码更容易。方案D的优点是,因为有线程不断扫描,所以可以自动重新发布一直处于send状态的历史记录。缺点是此扫描线程会增加数据库的负载。如果要并行扫描,需要解决任务切片和编排的问题(参见elastic-job),编码相对困难。

综合以上的优缺点,我们最终选择了方案C。其实,方案D不是通过网络请求向MQ发送消息,而是和大家熟知的“本地事务表”方案非常相似,都是一个将 MQ 事务与本地数据库事务绑定的想法。

实现

数据结构并定义请求和响应

@Data
@NoArgsConstructor
public class WebHookRequest {

    @NotBlank
    private String url;

    private String method;

    private Map<String, String> headers;

    private String body;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WebHookResponse {

    private String id;
    private String data;
    private int code;

    public boolean isSuccessful() {
        return this.code >= 200 && this.code < 300;
    }

}

发送方法

public WebHookResponse send(WebHookRequest webHookRequest, String id, boolean retryOnServerError) {
    //如果traceId为空则根据请求参数生成一个md5的值作为traceId
    if ((id)) {
        //拼接请求参数
        String content = ()
                .concat(())
                .concat((()))
                .concat(());
        id = (content);
    }
        // 查找发送记录
    WebHookRecord webHookRecord = (id, ());
    if (webHookRecord == null) {
        webHookRecord = createFromRequest(webHookRequest);
        (id);
        try {
                        // 新建发送记录
            (webHookRecord);
        } catch (Exception ex) {
            ("Duplicate key for [{}]", ());
            throw new BizException(B_01000, ex);
        }
    } else {
                // 如果历史已经存在,可能是客户端发送重发请求,那么先判断是否可以重发,sending状态在一定时间间隔内不能重发,避免频繁失败
        if (((), ())) {
            // 可以重发那么更新状态为sending
                        this.updateResendStatus(webHookRecord);
        } else {
            ("Should not resend key for [{}]", ());
            throw new BizException(B_01000);
        }
    }
        // 通过网络发送请求,如果失败会直接更新status为failure并抛出异常,发送过程的异常和得到响应对方服务报异常还是不一样的
    WebHookResponse webHookResponse = (id, webHookRequest);
        // 发送完成后更新status
    this.onResponse(id, webHookResponse);
    if (retryOnServerError && () >= 500) {
        throw new RetryException("server error!");
    }
    return webHookResponse;
}

/*
* 判断是否可以进行重发
*/
private boolean shouldResend(DateTime sendTime, String status) {
    if (!(status)) {
        return true;
    }
    // 发送状态超过60s可重发
    return sendTime.plusSeconds(60).isBefore(());
}

/*
* 调用网络接口进行发送
*/
private WebHookResponse doSend(String id, WebHookRequest webHookRequest) {
    //调用httpClient
    String responseBodyString = null;
    Response response;
    try {
        response = ((),
                (),
                (),
                ());
        if (() != null) {
            responseBodyString = ().string();
        }
    } catch (Exception ex) {
        (id, (ex));
        throw new RetryException("http send error!", ex);
    }
    return new WebHookResponse(id, responseBodyString, ());
}

/*
* 请求正常返回后的处理
* 这里有一个乐观锁的问题,如果同时有多个线程调用改办法修改同一个历史请求,只有一个线程会更新成功
*/
private void onResponse(String id, WebHookResponse webHookResponse) {
    WebHookRecord webHookRecord = (id, ());
    (());
    Code(());
    //更新状态
    if (()) {
        (STATUS_SUCCESS);
    } else {
        (STATUS_ERROR);
    }
    int count = (webHookRecord);
    if (count == 0) {
        ("Attempt to update WebHook id={} with wrong version ({})", id, ());
    }
}

/*
* 再次发送时更新状态 (真正发送网络请求前)
*/
private void updateResendStatus(WebHookRecord webHookRecord) {
    (STATUS_SENDING);
    (());
    int count = (webHookRecord);
    if (count == 0) {
        throw new OptimisticLockingFailureException("Attempt to update WebHook with wrong version (" + () + ")");
    }
}

为了避免多个线程同时开始重试同一请求的问题,我们使用updateResendStatus方法 使用乐观锁。如果其中一个线程更新状态成功,其他线程就会因为乐观锁问题直接失败,不会走到真正发送网络请求的那一步。也就是说,大多数并发问题在网络请求发送之前就被过滤掉了。

最后,可以使用单元测试来模拟并发请求进行验证。

@Test
public void testMultiThreads() throws InterruptedException {
    //调用send接口
    WebHookRequest webHookRequest = new WebHookRequest();
    ("");
    ("POST");
    Map<String, String> headers = new HashMap<>(1);
    (headers);

    int nLoop = 100;
    String clientId = ().toString();
    CountDownLatch countDownLatch = new CountDownLatch(nLoop);
    Runnable task = () -> {
        try {
            givenToken().when().body(webHookRequest).post("/webhooks/send?clientId=" + clientId)
                    .then()
                    .statusCode(())
                    .extract()
                    .response();
        } finally {
            ();
        }
    };
    ExecutorService executorService = new ThreadPoolBuilder.FixedThreadPoolBuilder().setThreadNamePrefix("thread-webhook").setPoolSize(100).build();
    for (int i = 0; i < nLoop; i++) {
        (task);
    }
    ();

    int times = (clientId, 1L).getTimes();
        // 验证数据库里记录的发送次数是否 等于 真正调用发送网络请求接口的次数
    (webHookIntegrationService, (times)).send((),
            (), (), ());
}

概述

如果想进一步提升性能,可以使用支持异步处理的httpclient工具包onResponse并在回调中进行处理。

总的来说,这是一个很简单的小要求,但实际上需要花很大的功夫去想清楚。本质是异构系统之间的数据一致性问题。当我们将发送网络请求改为将数据写入redis、写入MQ以及写入另一个微服务时,我们会发现它们之间的共性。当一个请求涉及多个系统并且无法包装到同一个事务中时,就会出现此问题。至于解决方案是两步提交、事后补偿还是自动对账,请根据自己的业务特点进行选择。

版权声明

本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。

热门