与Github的webhook实现类似,
Webhook是一个非常强大的推送机制。熟悉WordPress的同学可以比较构建WP生态系统的不同hook功能。Githubt允许开发人员通过Webhooks监控存储库中的更改,以触发持续集成工具(例如Travis CI)的操作。
要求
大家都在Github上看过webhook。您可以为特定存储库设置 Webhook 来监控存储库更改,例如推送、page_build 和其他事件(X-GitHub-Event)。
每次提交都会有一个 uuid 作为标签,并写入 X-GitHub 传递的 HTTP 标头。要查看失败的提交历史记录,请单击“重新发送”重新发送。
![]()
如果该功能作为单独的服务提供商使用,其基本要求是准确记录该服务与互联网之间的任何网络交互过程,包括请求数据和响应结果数据的发送。继续细化,
- 服务提供者接收客户端的调用,将请求发送到客户端指定的URL,并接收响应。
- 记录每个客户端的原始请求内容(url、method、header、body)和响应(header、body、code等)
- 必须考虑到客户端的重试或重复调用,需要记录每通话次数请求和最后一次调用的时间。 (当客户端调用时,可以发送一个客户端ID到接收端进行去重,如果没有,服务提供商会根据请求生成一个唯一的uuid)
- 提供接口重新发送指定的调度历史记录。
思考
有了需求,首先思考这个服务要和哪些系统交互?
- 请求必须发送到指定的URL,那么第一个交互的系统就是公网服务。
- 发送历史必须保留,表示数据必须保留。另一个与之交互的系统是数据库。
好,交互系统确定之后,接下来就要考虑顺序了。我们应该先将请求发送到公共网络服务还是先服务数据库?我们来一一分析
- 方案A:先发送请求,再将记录写入数据库。问题:如果发送请求但数据库写入失败,会因为发送历史丢失而出现数据不一致的情况。
- 选项B:先写入数据库,然后发送请求。问题:与A类似,如果数据库写入成功,由于网络中断等原因导致请求无法发送。此时数据也会不一致。虽然有发送历史记录,但发送本身失败。
- 选项C:先写入数据库,然后发送请求,最后更新数据。这种方案相对A、B来说更加可靠。第一步,写入请求的数据,并将状态(status)设置为
sending。发送完成后,状态会更新为成功或失败。 - 选项D:先写入数据库,将状态设置为
正在发送,启动新线程扫描表,发送状态为ready的记录。发送完成后,将状态更新为成功或失败。
前两种选择绝对不可取。让我们分析一下最后两个选项的优缺点。方案C的缺点是第一步写入数据库完成后,发送请求时系统宕机,记录会一直处于send状态。幸运的是,总体计划将提供手动重试机制(点击 Redeliver),可以弥补后者。优点是序列化的心理编码更容易。方案D的优点是,因为有线程不断扫描,所以可以自动重新发布一直处于send状态的历史记录。缺点是此扫描线程会增加数据库的负载。如果要并行扫描,需要解决任务切片和编排的问题(参见elastic-job),编码相对困难。
综合以上的优缺点,我们最终选择了方案C。其实,方案D不是通过网络请求向MQ发送消息,而是和大家熟知的“本地事务表”方案非常相似,都是一个将 MQ 事务与本地数据库事务绑定的想法。
实现
数据结构并定义请求和响应
@Data
@NoArgsConstructor
public class WebHookRequest {
@NotBlank
private String url;
private String method;
private Map<String, String> headers;
private String body;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WebHookResponse {
private String id;
private String data;
private int code;
public boolean isSuccessful() {
return this.code >= 200 && this.code < 300;
}
}发送方法
public WebHookResponse send(WebHookRequest webHookRequest, String id, boolean retryOnServerError) {
//如果traceId为空则根据请求参数生成一个md5的值作为traceId
if ((id)) {
//拼接请求参数
String content = ()
.concat(())
.concat((()))
.concat(());
id = (content);
}
// 查找发送记录
WebHookRecord webHookRecord = (id, ());
if (webHookRecord == null) {
webHookRecord = createFromRequest(webHookRequest);
(id);
try {
// 新建发送记录
(webHookRecord);
} catch (Exception ex) {
("Duplicate key for [{}]", ());
throw new BizException(B_01000, ex);
}
} else {
// 如果历史已经存在,可能是客户端发送重发请求,那么先判断是否可以重发,sending状态在一定时间间隔内不能重发,避免频繁失败
if (((), ())) {
// 可以重发那么更新状态为sending
this.updateResendStatus(webHookRecord);
} else {
("Should not resend key for [{}]", ());
throw new BizException(B_01000);
}
}
// 通过网络发送请求,如果失败会直接更新status为failure并抛出异常,发送过程的异常和得到响应对方服务报异常还是不一样的
WebHookResponse webHookResponse = (id, webHookRequest);
// 发送完成后更新status
this.onResponse(id, webHookResponse);
if (retryOnServerError && () >= 500) {
throw new RetryException("server error!");
}
return webHookResponse;
}
/*
* 判断是否可以进行重发
*/
private boolean shouldResend(DateTime sendTime, String status) {
if (!(status)) {
return true;
}
// 发送状态超过60s可重发
return sendTime.plusSeconds(60).isBefore(());
}
/*
* 调用网络接口进行发送
*/
private WebHookResponse doSend(String id, WebHookRequest webHookRequest) {
//调用httpClient
String responseBodyString = null;
Response response;
try {
response = ((),
(),
(),
());
if (() != null) {
responseBodyString = ().string();
}
} catch (Exception ex) {
(id, (ex));
throw new RetryException("http send error!", ex);
}
return new WebHookResponse(id, responseBodyString, ());
}
/*
* 请求正常返回后的处理
* 这里有一个乐观锁的问题,如果同时有多个线程调用改办法修改同一个历史请求,只有一个线程会更新成功
*/
private void onResponse(String id, WebHookResponse webHookResponse) {
WebHookRecord webHookRecord = (id, ());
(());
Code(());
//更新状态
if (()) {
(STATUS_SUCCESS);
} else {
(STATUS_ERROR);
}
int count = (webHookRecord);
if (count == 0) {
("Attempt to update WebHook id={} with wrong version ({})", id, ());
}
}
/*
* 再次发送时更新状态 (真正发送网络请求前)
*/
private void updateResendStatus(WebHookRecord webHookRecord) {
(STATUS_SENDING);
(());
int count = (webHookRecord);
if (count == 0) {
throw new OptimisticLockingFailureException("Attempt to update WebHook with wrong version (" + () + ")");
}
}为了避免多个线程同时开始重试同一请求的问题,我们使用updateResendStatus方法 使用乐观锁。如果其中一个线程更新状态成功,其他线程就会因为乐观锁问题直接失败,不会走到真正发送网络请求的那一步。也就是说,大多数并发问题在网络请求发送之前就被过滤掉了。
最后,可以使用单元测试来模拟并发请求进行验证。
@Test
public void testMultiThreads() throws InterruptedException {
//调用send接口
WebHookRequest webHookRequest = new WebHookRequest();
("");
("POST");
Map<String, String> headers = new HashMap<>(1);
(headers);
int nLoop = 100;
String clientId = ().toString();
CountDownLatch countDownLatch = new CountDownLatch(nLoop);
Runnable task = () -> {
try {
givenToken().when().body(webHookRequest).post("/webhooks/send?clientId=" + clientId)
.then()
.statusCode(())
.extract()
.response();
} finally {
();
}
};
ExecutorService executorService = new ThreadPoolBuilder.FixedThreadPoolBuilder().setThreadNamePrefix("thread-webhook").setPoolSize(100).build();
for (int i = 0; i < nLoop; i++) {
(task);
}
();
int times = (clientId, 1L).getTimes();
// 验证数据库里记录的发送次数是否 等于 真正调用发送网络请求接口的次数
(webHookIntegrationService, (times)).send((),
(), (), ());
}概述
如果想进一步提升性能,可以使用支持异步处理的httpclient工具包onResponse并在回调中进行处理。
总的来说,这是一个很简单的小要求,但实际上需要花很大的功夫去想清楚。本质是异构系统之间的数据一致性问题。当我们将发送网络请求改为将数据写入redis、写入MQ以及写入另一个微服务时,我们会发现它们之间的共性。当一个请求涉及多个系统并且无法包装到同一个事务中时,就会出现此问题。至于解决方案是两步提交、事后补偿还是自动对账,请根据自己的业务特点进行选择。
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
code前端网