前一篇 Dapr-服务调用 对Dapr的服务调用方式进行了解,本篇继续对状态管理进行了解。
在分布式应用程序中跟踪状态存在一下问题:
应用程序可能需要不同类型的数据存储。
访问和更新数据时可能需要不同的一致性级别。
多个用户可以同时更新数据,需要冲突解决。
在与数据存储交互时,服务必须重试发生的任何短暂的暂时性错误 。
Dapr 状态管理构建块解决了这些难题。它简化了跟踪状态,而无需依赖关系或第三方存储 Sdk 上的学习曲线。
Dapr 状态管理提供 密钥/值 API。此功能不支持关系数据存储或图形数据存储。
应用程序可以使用Dapr的状态管理API,使用状态存储组件保存和读取键/值对。通过 HTTP 或 gRPC 调用 API。
例如,通过使用HTTP POST可以保存键/值对,通过使用HTTP GET可以读取一个键并返回它的值。
Dapr数据存储被建模为组件,可以在不修改你的服务代码的情况下进行替换。
CAP 定理是一组适用于存储状态的分布式系统的原则。下图展示了CAP定理的三个属性。
定理指出,分布式数据系统提供一致性、可用性和分区容差之间的权衡。而且,任何数据存储只能 保证三个属性中的两个:
一致性 (C) 。群集中的每个节点都将使用最新的数据做出响应,即使在所有副本更新之前,系统都必须阻止请求。如果查询当前正在更新的项的 "一致性系统",则在所有副本都成功更新之前,将不会收到响应。但是,您将始终接收最新的数据。
可用性 () 。即使该响应不是最新的数据,每个节点都将返回立即响应。如果您在 "可用系统" 中查询正在更新的项,则您将获得该服务在此时可以提供的最佳可能的答案。
分区容差 (P) 。即使复制的数据节点发生故障或失去与其他复制的数据节点的连接,保证系统仍可继续运行。
分布式应用程序必须处理 P 属性。随着服务彼此间的网络调用通信,会发生网络中断 (P) 。考虑到这一点,分布式应用程序必须是 AP 或 CP。
AP 应用程序选择 "可用性一致性"。Dapr 通过其 最终一致性 策略支持此选择。请考虑使用基础数据存储(例如 Azure CosmosDB)将冗余数据存储在多个副本上。对于最终一致性,状态存储会将更新写入一个副本并完成与客户端的写入请求。此时间过后,存储将以异步方式更新其副本。读取请求可以返回任何副本的数据,包括尚未收到最新更新的副本。
CP 应用程序选择一致性和可用性。Dapr 通过其 强一致性 策略支持此选择。在此方案中,状态存储将同步更新 所有 (或在某些情况下,在完成写入请求 之前) 必需副本的 仲裁。读取操作将跨副本持续返回最新数据。
在多用户应用程序中,有可能多个用户同时更新同一时间) (相同的数据。Dapr 支持乐观并发控制 (OCC) 来管理冲突。OCC 基于一个假设,因为用户处理数据的不同部分,所以更新冲突很少见。更有效的方法是将更新成功,如果不成功,则重试。实现悲观锁定的替代方法可能会影响长时间运行的锁定,导致数据争用。
Dapr 支持使用 Etag) (OCC 的乐观并发控制。ETag 是与存储的键/值对的特定版本相关联的值。键/值对的每次更新时,ETag 值也会更新。当客户端检索键/值对时,响应包括当前 ETag 值。当客户端更新或删除键/值对时,它必须在请求正文中发送回该 ETag 值。如果其他客户端同时更新了数据,则 Etag 不会匹配,请求将失败。此时,客户端必须检索更新的数据,重新进行更改,然后重新提交更新。此策略称为 第一次写入-wins。
Dapr 还支持 最后写入 wins 策略。使用此方法时,客户端不会将 ETag 附加到写入请求。状态存储组件将始终允许更新,即使基础值在会话期间已更改也是如此。最后写入-wins 对于数据争用较少的高吞吐量写入方案非常有用。同样,可以容忍偶尔的用户更新。
Dapr 可以将 多项更改 作为一个作为事务实现的操作写入数据存储区。此功能仅适用于支持 ACID 事务的数据存储。在撰写本文时,这些存储包括 Redis、MongoDB、PostgreSQL、SQL Server和 Azure CosmosDB。
在下面的示例中,多项操作将发送到单个事务中的状态存储。所有操作都必须成功,事务才能提交。如果一个或多个操作失败,则回滚整个事务。
1、在项目【DaprFrontEnd】中添加控制器-DaprStateController 用于展示状态的各种操作
[Route("[controller]")]
[ApiController]
public class StateController : ControllerBase
{
private readonly ILogger<DaprStateController> _logger;
private readonly DaprClient _daprClient;
public StateController(ILogger<StateController> logger, DaprClient daprClient)
{
_logger = logger;
_daprClient = daprClient;
}
/// <summary>
/// 获取值
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult> GetAsync()
{
var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
return Ok(result);
}
/// <summary>
/// 保存值
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult> PostAsync()
{
await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
return Ok("done");
}
/// <summary>
/// 删除值
/// </summary>
/// <returns></returns>
[HttpDelete]
public async Task<ActionResult> DeleteAsync()
{
await _daprClient.DeleteStateAsync("statestore", "guid");
return Ok("done");
}
/// <summary>
/// 通过tag防止并发冲突,保存值
/// </summary>
/// <returns></returns>
[HttpPost("withtag")]
public async Task<ActionResult> PostWithTagAsync()
{
var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag);
return Ok("done");
}
/// <summary>
/// 通过tag防止并发冲突,删除值
/// </summary>
/// <returns></returns>
[HttpDelete("withtag")]
public async Task<ActionResult> DeleteWithTagAsync()
{
var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
}
/// <summary>
/// 从绑定获取值,健值name从路由模板获取
/// </summary>
/// <param name="state"></param>
/// <returns></returns>
[HttpGet("frombinding/{name}")]
public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
return await Task.FromResult<ActionResult>(Ok(state.Value));
}
/// <summary>
/// 根据绑定获取并修改值,健值name从路由模板获取
/// </summary>
/// <param name="state"></param>
/// <returns></returns>
[HttpPost("withbinding/{name}")]
public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
state.Value = Guid.NewGuid().ToString();
return Ok(await state.TrySaveAsync());
}
/// <summary>
/// 获取多个值
/// </summary>
/// <returns></returns>
[HttpGet("list")]
public async Task<ActionResult> GetListAsync()
{
var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
return Ok(result);
}
/// <summary>
/// 删除多个值
/// </summary>
/// <returns></returns>
[HttpDelete("list")]
public async Task<ActionResult> DeleteListAsync()
{
var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
var removeList = new List<BulkDeleteStateItem>();
foreach (var item in data)
{
removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
}
await _daprClient.DeleteBulkStateAsync("statestore", removeList);
return Ok("done");
}
}
2、启动程序
dapr run --dapr-http-port 3501 --app-port 8230 --app-id frontend dotnet .\DaprFrontEnd.dll
3、调用过程:
Dapr 状态管理构建块提供了一个 API,用于在各种数据存储区中存储键/值数据。API 为以下内容提供支持:
批量操作
强一致性和最终一致性
乐观并发控制
多项事务