最近的一个项目是风控过程数据实时统计分析和聚合的一个 OLAP 分析平台,日流量峰值在 10 到 12 亿上下,每年数据约 4000 亿条,占用空间大概 200T。
经过对 Elasticsearch 多方调研和超过几百亿条数据的插入和聚合查询的验证之后,我们总结出以下几种能够有效提升性能和解决这一问题的方案:
存,以时间为轴,数据只有增加,没有变更,并且必须包含 timestamp(日期时间,名称随意)字段。 其作用和意义要大于数据的 id 字段,常见的数据比如我们通常要记录的操作日志、用户行为日志、或股市行情数据、服务器 CPU、内存、网络的使用率等。
取,一定是以时间范围为第一过滤条件,然后是其他查询条件,比如近一天、一周、本月等等,然后在这个范围内进行二次过滤。 比如性别或地域等,查询结果中比较关注的是每条数据和 timestamp 字段具体发生的时间点,而非 id。
为了让集群有更好的性能表现,我们应该对这两个角色有一个更好的规划,在 Nodes 之间做读取分离,集群的稳定性和快速响应,在大规模的数据存储和查询的压力之下能够坦然面对,各自愉快的协作。
Master Node,整个集群的管理者,负有对 index 的管理、shards 的分配,以及整个集群拓扑信息的管理等功能。
众所周知,Master Node 可以通过 Data Node 兼任,但是,如果对群集规模和稳定要求很高的话,就要职责分离,Master Node 推荐,它的状态关乎整个集群的存活。
这样 Master 不参与 I、O,从数据的搜索和索引操作中出来,专门负责集群的管理工作,因此 Master Node 的节点配置可以相对低一些。
这个参数决定选举 Master 的 Node 个数,太小容易发生“脑裂”,可能会出现多个 Master,太大 Master 将无法选举。
这些操作严重消耗系统的 CPU、内存、IO 等资源,因此,应该把最好的资源分配给 Data Node,因为它们是真正干累活的角色,同样 Data Node 也不兼任 Master 的功能。
正如我们知道的一样,在某个 Node 收到用户请求的时候,会将请求转发到集群中所有索引相关的 Node 上,之后将每个 Node 的计算结果合并后返回给请求方。
我们暂且将这个 Node 称为查询节点,整个过程跟分布式数据库原理类似。那问题来了,这个查询节点如果在并发和数据量比较大的情况下,由于数据的聚合可能会让内存和网络出现瓶颈。
因此,在职责分离指导思想的前提下,这些操作我们也应该从这些角色中剥离出来,称它是 Coordinating Nodes,只负责由用户的请求,包括读、写等操作,对内存、网络和 CPU 要求比较高。
本质上,Coordinating Only Nodes 可以笼统的理解为是一个负载均衡器,或者反向代理,只负责读,本身不写数据,它的配置是:
那是不是越多越好呢?在一定范围内是肯定的,但凡事有个度,过了负作用就会突显,太多的话会给集群增加负担。
search.remote.connect 是禁用跨集群查询,防止在进行集群之间查询时发生二次由:
类似于分布式数据库中的分片原则,将符合规则的数据存储到同一分片。ES 通过哈希算法来决定数据存储于哪个 Shard:
简单的来说,一个查询请求过来以后会查询每个 Shard,然后做结果聚合,总的时间大概就是所有 Shard 查询所消耗的时间之和。
会根据 Routing 查询特定的一个或多个 Shard,这样就大大减少了查询时间,提高了查询效率。
当然,如何设置 Routing 是一个难点,需要一点技巧,要根据业务特点合理组合 Routing 的值,来划分 Shard 的存储,最终保持数据量相对均衡。
可以组合几个维度做为 Routing ,有点类似于 Hbase Key,例如不同的业务线加不同的类别,不同的城市和不同的类型等等,如:
再例如我们的业务场景,A 业务线的数据量级远远大于 B 业务线,有时候很难通过 Routing 指定一个值数据在所有 Shards 上均匀分布,会让部分 Shard 变的越来越大,影响查询性能,怎么办?
这样可以根据需要在不同 Index 之间查询,然而每个 Index 中 Shards 的数据可以做到相对均衡。
另一种办法是指定 Index 参数 index.routing_partition_size,来解决最终可能产生群集不均衡的问题,指定这个参数后新的算法如下:
很多情况下,指定 Routing 后会大幅提升查询性能,毕竟查询的 Shard 只有那么几个,但是如何设置 Routing 是个难题,可根据业务特性巧妙组合。
Index 通过横向扩展 Shards 实现分布式存储,这样可以解决 Index 大数据存储的问题。
但在一个 Index 变的越来越大,单个 Shard 也越来越大,查询和存储的速度也越来越慢。
考虑到 I、O,针对 Index 每个 Node 的 Shards 数最好不超过 3 个,那面对这样一个庞大的 Index,我们是采用更多的 Shards,还是更多的 Index,我们如何选择?
Index 的 Shards 总量也不宜太多,更多的 Shards 会带来更多的 I、O 开销,其实答案就已经很明确,除非你能接受长时间的查询等待。
Index 拆分的思很简单,时序索引有一个好处就是只有增加,没有变更,按时间累积,天然对索引的拆分友好支持,可以按照时间和数据量做任意时间段的拆分。
我们知道 ES 可以为同一目的或同一类索引创建一个 Index Template,之后创建的索引只要符合匹配规则就会套用这个 Template,不必每次指定 Settings 和 Mappings 等属性。
有冲突通过 Template 的属性order : 0 从低到高覆盖(这部分据说会在 ES6 中会做调整,更好的解决 Template 匹配冲突问题)。
Rollover Index 可以将现有的索引通过一定的规则,如数据量和时间,索引的命名必须是 logs-000001 这种格式,并指定 aliases,示例:
如果索引中的数据大于规则中指定的数据量或者时间过时,新的索引将被创建,索引名称为 logs-000002,并根据规则套用 Index Template,同时别名 logs_write 也将被变更到 logs-000002。
这样就会按日期进行切割索引,那如果你想查询 3 天内的数据可以通过日期规则来匹配索引名,如:
到此,我们就可以通过 Index Template 和 Rollover API 实现对 Index 的任意拆分,并按照需要进行任意时间段的合并查询,这样只要你的时间跨度不是很大,查询速度一般可以控制在毫秒级,存储性能也不会遇到瓶颈。
冷热架构,为了大规模时序索引实时数据分析的时效性,可以根据资源配置不同将 Data Nodes 进行分类形成分层或分组架构。
即 Hot-Warm 架构,对 CPU,磁盘、内存等硬件资源合理的规划和利用,达到性能和效率的最大化。
拥有最好资源的 Data Nodes,如更高性能的 CPU、SSD 磁盘、内存等资源,这些特殊的 Nodes 支持索引所有的读、写操作。
为这类 Data Nodes 打上标签,以便我们在 Template 中识别和指定,启动参数如下:
存储只读数据,并且查询量较少,但用于存储长多时间历史数据的 Data Nodes,这类 Nodes 相对 Hot Nodes 较差的硬件配置,根据需求配置稍差的 CPU、机械磁盘和其他硬件资源,至于数量根据需要保留多长时间的数据来配比,同样需要打上标签,方法跟 Hot Nodes 一样,指定为 Warm,box_type: warm。
同样符合命名规则索引的 Shard 会被分配到 Warm Nodes 上,我们指定了更少的 Shards 数量和复本数。
注意,这里的复本数为 0,和 best_compression 级别的压缩,方便做迁移等操作,以及进行一些数据的压缩:
假如我们已经创建了索引 active-logs-1 ,当然你可以通过 _bulk API 快速写入测试的数据,然后参考上文中介绍的 Rollover API 进行切割。
Cluster Health API 可以查看迁移状态,完成后进行收缩操作,其实相当于复制出来一个新的索引,旧的索引还存在。
到目前为止我们已经实现了索引的冷热分离,和索引的收缩,我们知道每个 Shard 下面由多个 Segment 组成,那 inactive-logs-1 的 Shard 数是 1,但 Segment 还是多个。
这类索引不会在接受写入操作,为了节约空间和改善查询性能,通过 Forcemerge API 将 Segment 适量合并:
走到这里,我们就已经实现了 Index 的 Hot-Warm 架构,根据业务特点将一段时间的数据放在 Hot Nodes,更多的历史数据存储于 Warm Nodes。
在多条业务线的索引共用一个 ES 集群时会发生流量被独吃独占的情况,因为大家都共用相同的集群资源,流量大的索引会占用大部分计算资源而流量小的也会被拖慢,得不到即时响应,或者说业务流量大的索引可以按天拆分,几个流量小的索引可以按周或月拆分。
除了 Master Nodes 以外,Data Nodes 和 Coordinating Nodes 都可以使用(其实如果每个索引的量都特别大也应该采用这种架构),还有一个好处是对方的某个 Node 挂掉,自己不受影响。
等打标签的方式来区别对应不同的索引,然后在各自的 Index Template 中指定不同的 node.attr.zone 即可。
如果你的业务遍布全国各地,四海八荒,如果你数据要存储到多个机房,如果你的 Index 有几万个甚至更多( Index 特别多,集群庞大会导致 Cluster State 信息量特别大,因为 Cluster State 包含了所有 Shard、Index、Node 等所有相关信息,它存储在每个 Node 上,这些数据发生变化都会实时同步到所有 Node 上,当这个数据很大的时候会对集群的性能造成影响)。
所不同的是 Tribe 是针对多个 ES 集群之间的所有节点,Tribe Node 收到请求到相关集群中所有节点,将结果合并处理后返回。
表面上看起来 Tribe Node 将多个集群成了一个整体,遇到同名 Index 发生冲突,会选择其中一个 Index,也可以指定:
提交以后整个集群所有节点都会生效,都可以做为代理去做跨集群联合查询,不过我们最好还是通过 Coordinating Only Nodes 去发起请求。
目前这个功能还在测试阶段,但未来可能会取代 Tribe Node,之间的最大的差异是 Tribe Node 需要设置的节点,而 Cross Cluster Search 不需要,集群中的任意一个节点都可以兼任。
比如可以用我们的 Coordinating Only Nodes 做为联合查询节点,另一个优点是配置是动态的,不需要重启节点。
实际上可以理解为是一个 ES 集群之间特定的动态代理工具,支持所有操作,包括 Index 的创建和修改,并且通过 Namespace 对 Index 进行隔离,也解决了 Tribe Node 之 Index 名称冲突的问题。
我们在文中介绍了几种方案用来解决时序索引的海量数据存储和查询的问题,根据业务特点和使用场景来单独或组合使用能够发挥出意想不到的效果。
特别是 Nodes 之间的读写分离、索引拆分、Hot-Warm 等方案的组合应用对索引的查询和存储性能有显著的提升。
如果你的索引 Mapping 中没有 parent-child 关联关系可以考虑使用,钱包颜色与财运对查询的性能提升非常有效。