最近在做一个基于Azure云的物联网分析项目:
.netcore采集程序向Azure事件中心(EventHubs)发送数据,通过Azure EventHubs Capture转储到Azure BlogStorage,供数据科学团队分析。
Azure事件中心是一种Azure上完全托管的实时数据摄取服务, 每秒可流式传输来自website、app、device任何源的数百万个事件。提供的统一流式处理平台和时间保留缓冲区,将事件生成者和事件使用者分开。
分区使用者模式
提供消息流式处理功能,提高可用性和并行化例如,如果事件中心具有四个分区,并且其中一个分区要在负载均衡操作中从一台服务器移动到另一台服务器,则仍可以通过其他三个分区进行发送和接收。 此外,具有更多分区可以让更多并发读取器处理数据,从而提高聚合吞吐量。 了解分布式系统中分区和排序的意义是解决方案设计的重要方面。 为了帮助说明排序与可用性之间的权衡,请参阅 CAP 定理
最直观的方式:请在portal.azure.cn门户站点---->创建事件中心命名空间---> 创建事件中心
我们使用Asp.NetCore以Azure App Service形式部署,依赖Azure App Service的自动缩放能录应对物联网的潮汐大流量。
通常推荐批量发送到事件中心,能有效增加web服务的吞吐量和响应能力。
目前新版SDk: Azure.Messaging.EventHubs仅支持分批
发送。
EventHubProducerClient
客户端负责分批发送数据到事件中心,根据发送时指定的选项,事件数据可能会自动路由到可用分区或发送到特定请求的分区。在以下情况下,建议允许自动路由分区:
1) 事件的发送必须高度可用
2) 事件数据应在所有可用分区之间平均分配。
自动路由分区的规则:
1)使用循环法将事件平均分配到所有可用分区中
2)如果某个分区不可用,事件中心将自动检测到该分区并将消息转发到另一个可用分区。
我们要注意,根据选定的 命令空间定价层, 每批次发给事件中心的最大消息大小也不一样:
这里我们就需要思考: web程序收集数据是以个数
为单位; 但是我们分批发送时要根据分批的字节大小
来切分。
我的方案是: 因引入TPL Dataflow 管道:
TransformBlock<string, EventData>
BatchBlock<EventData>
按照个数打包ActionBlock<EventData[]>
在包内 累积指定字节大小批量发送核心的TPL Dataflow代码如下:
public class MsgBatchSender
{
private readonly EventHubProducerClient Client;
private readonly TransformBlock<string, EventData> _transformBlock;
private readonly BatchBlock<EventData> _packer;
private readonly ActionBlock<EventData[]> _batchSender;
private readonly DataflowOption _dataflowOption;
private readonly Timer _trigger;
private readonly ILogger _logger;
public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory)
{
Client = client;
_dataflowOption = option.Value;
var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };
_transformBlock = new TransformBlock<string, EventData>(
text => new EventData(Encoding.UTF8.GetBytes(text)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism
});
_packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);
_batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));
_packer.LinkTo(_batchSender, dfLinkoption);
_transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);
_trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));
_logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();
}
private async Task BatchSendAsync(EventData[] msgs)
{
try
{
if (msgs != null)
{
var i = 0;
while (i < msgs.Length)
{
var batch = await Client.CreateBatchAsync();
while (i < msgs.Length)
{
if (batch.TryAdd(msgs[i++]) == false)
{
break;
}
}
if(batch!= null && batch.Count>0)
{
await Client.SendAsync(batch);
batch.Dispose();
}
}
}
}
catch (Exception ex)
{
// ignore and log any exception
_logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);
}
}
public async Task<bool> PostMsgsync(string txt)
{
return await _transformBlock.SendAsync(txt);
}
public async Task CompleteAsync()
{
_transformBlock.Complete();
await _transformBlock.Completion;
await _batchSender.Completion;
await _batchSender.Completion;
}
}
源码地址:https://github.com/zaozaoniao/SaicEnergyTracker
如何利用.NETCore向Azure EventHubs准实时批量发送数据?
原文:https://www.cnblogs.com/JulianHuang/p/13230907.html