kata
我再次为我的公司在GeeCON 2016上举办了编程竞赛。 这次分配需要设计并根据以下要求选择实施系统:
一个系统每秒发送大约一千个事件。 每个Event
至少具有两个属性:
-
clientId
–我们期望一个客户端每秒最多可以处理几个事件 -
UUID
–全球唯一
消耗一个事件大约需要10毫秒。 设计此类流的使用者:
- 允许实时处理事件
- 与一个客户端有关的事件应按顺序进行处理,即,您不能并行处理同一
clientId
事件 - 如果10秒钟内出现重复的
UUID
,请将其删除。 假设10秒钟后不会出现重复
这些要求中没有几个重要的细节:
- 1000个事件/秒和10毫秒消耗一个事件。 显然,我们至少需要10个并发使用者才能实时消费。
- 事件具有自然的聚合ID(
clientId
)。 在一秒钟内,我们可以为给定的客户端预期一些事件,并且不允许我们同时或无序处理它们。 - 我们必须以某种方式忽略重复的消息,最有可能的是通过记住最近10秒钟内的所有唯一ID。 这使大约一万个
UUID
得以临时保留。
在本文中,我将指导您完成一些正确的解决方案,并尝试一些失败的尝试。 您还将学习如何使用少量精确定位的指标来解决问题。
天真的顺序处理
让我们通过迭代解决这个问题。 首先,我们必须对API进行一些假设。 想象一下:
interface EventStream {void consume(EventConsumer consumer);}@FunctionalInterface
interface EventConsumer {Event consume(Event event);
}@Value
class Event {private final Instant created = Instant.now();private final int clientId;private final UUID uuid;}
典型的基于推送的API,类似于JMS。 一个重要的注意事项是EventConsumer
正在阻止,这意味着直到EventConsumer
消耗了前一个Event
,它才交付新的Event
。 这只是我所做的一个假设,并没有彻底改变需求。 这也是JMS中消息侦听器的工作方式。 天真的实现只附加了一个侦听器,该侦听器需要大约10毫秒才能完成:
class ClientProjection implements EventConsumer {@Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}}
当然,在现实生活中,这个使用者会在数据库中存储一些东西,进行远程调用等。我在睡眠时间分配中添加了一些随机性,以使手动测试更加实际:
class Sleeper {private static final Random RANDOM = new Random();static void randSleep(double mean, double stdDev) {final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es = new EventStream(); //some real implementation here
es.consume(new ClientProjection());
它可以编译并运行,但是为了确定未满足要求,我们必须插入少量指标。 最重要的指标是消息消耗的延迟,以消息创建到开始处理之间的时间来衡量。 我们将为此使用Dropwizard指标 :
class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics = metrics;}@Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}}
提取ProjectionMetrics
类以分离职责:
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;import java.time.Duration;
import java.util.concurrent.TimeUnit;@Slf4j
class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));}void latency(Duration duration) {latencyHist.update(duration.toMillis());}
}
现在,当您运行朴素的解决方案时,您会Swift发现中值延迟以及99.9%的百分数无限增长:
type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0
type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0[...30 seconds later...]type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0
30秒后,我们的应用程序平均会延迟15秒处理事件。 并非完全实时 。 显然,缺少并发是任何原因。 我们的ClientProjection
事件使用者大约需要10毫秒才能完成,因此每秒可以处理多达100个事件,而我们还需要一个数量级。 我们必须以某种方式扩展ClientProjection
。 而且我们甚至都没有触及其他要求!
天真线程池
最明显的解决方案是从多个线程调用EventConsumer
。 最简单的方法是利用ExecutorService
:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService = Executors.newFixedThreadPool(size);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();}
}
我们在这里使用装饰器模式 。 实现EventConsumer
的原始ClientProjection
是正确的。 但是,我们将其与EventConsumer
另一个实现并发包装。 这将使我们能够编写复杂的行为而无需更改ClientProjection
本身。 这样的设计促进:
- 松散耦合:各种
EventConsumer
彼此都不了解,可以自由组合 - 单一责任:每个人都做一份工作,然后委派给下一个组成部分
- 开放/封闭原则 :我们可以在不修改现有实现的情况下更改系统的行为。
打开/关闭原理通常通过注入策略和模板方法模式来实现。 在这里,它甚至更简单。 整体接线如下:
MetricRegistry metricRegistry =new MetricRegistry();
ProjectionMetrics metrics =new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection =new ClientProjection(metrics);
NaivePool naivePool =new NaivePool(10, clientProjection);
EventStream es = new EventStream();
es.consume(naivePool);
我们精心设计的指标表明情况确实好得多:
type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0[...30 seconds later...]type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0
然而,我们仍然看到延迟的规模越来越小,在30秒后,延迟达到了364毫秒。 它一直在增长,所以问题是系统的。 我们……需要……更多……指标。 请注意, NaivePool
(您很快就会知道为什么它是naive )有正好有10个线程NaivePool
。 这应该足以处理数千个事件,每个事件需要10毫秒来处理。 实际上,我们需要一点额外的处理能力,以避免垃圾收集后或负载高峰时出现问题。 为了证明线程池实际上是我们的瓶颈,最好监视其内部队列。 这需要一些工作:
class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();String name = MetricRegistry.name(ProjectionMetrics.class, "queue");Gauge<Integer> gauge = queue::size;metricRegistry.register(name, gauge);this.executorService = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();}
}
这里的想法是手动创建ThreadPoolExecutor
,以提供自定义的LinkedBlockingQueue
实例。 我们稍后可以使用该队列来监视其长度(请参阅: ExecutorService – 10个技巧 )。 Gauge
将定期调用queue::size
并将其报告给您需要的地方。 度量标准确认线程池大小确实是一个问题:
type=GAUGE, name=[...].queue, value=35
type=GAUGE, name=[...].queue, value=52[...30 seconds later...]type=GAUGE, name=[...].queue, value=601
容纳待处理任务的队列的大小不断增加,这会损害延迟。 线程池大小从10增加到20最终报告了不错的结果,并且没有停顿。 但是,我们仍然没有解决重复项,也没有针对同一clientId
防止事件的同时修改。
晦涩的锁定
让我们从避免对同一clientId
的事件进行并发处理开始。 如果两个事件接连发生,并且都与同一个clientId
相关,那么NaivePool
将选择它们并开始同时处理它们。 首先,我们至少通过为每个clientId
设置一个Lock
来发现这种情况:
@Slf4j
class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {Lock lock = findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error("Client {} already being modified by another thread", event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}
这肯定是朝错误的方向前进。 复杂程度不堪重负,但是运行此代码至少表明存在问题。 事件处理管道如下所示,一个装饰器包装了另一个装饰器:
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification =new FailOnConcurrentModification(clientProjection);
NaivePool naivePool =new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es = new EventStream();es.consume(naivePool);
偶尔会弹出错误消息,告诉我们其他一些线程已经在处理同一clientId
事件。 对于每个clientId
我们关联一个检查的Lock
,以便确定当前是否有另一个线程不在处理该客户端。 尽管丑陋,但实际上我们已经接近残酷的解决方案。 而不是因为另一个线程已经在处理某个事件而无法获取Lock
时失败,让我们稍等一下,希望Lock
可以被释放:
@Slf4j
class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));}@Overridepublic Event consume(Event event) {try {final Lock lock = findClientLock(event);final Timer.Context time = lockWait.time();try {final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn("Interrupted", e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}
这个想法非常相似。 但是, tryLock()
失败,它最多等待1秒,以希望释放给定客户端的Lock
。 如果两个事件很快相继发生,一个事件将获得一个Lock
并继续执行,而另一个事件将阻止等待unlock()
发生。
不仅这些代码确实令人费解,而且还可能以许多微妙的方式被破坏。 例如,如果几乎同一时间发生同一客户clientId
两个事件,但是显然是第一个事件呢? 这两个事件将同时请求Lock
,并且我们无法保证哪个事件会首先获得不公平的Lock
,从而可能会乱序使用事件。 肯定有更好的办法…
专用线程
让我们退后一步,深吸一口气。 您如何确保事情不会同时发生? 好吧,只需使用一个线程! 事实上,这是我们一开始所做的,但吞吐量并不令人满意。 但是我们不关心不同的clientId
的并发性,我们只需要确保具有相同clientId
事件始终由同一线程处理即可!
也许您会想到创建从clientId
到Thread
的映射? 好吧,这将过于简单化。 我们将创建数千个线程,每个线程在大多数时间都根据需求空闲(对于给定的clientId
每秒只有很少的事件)。 一个不错的折衷方案是固定大小的线程池,每个线程负责clientId
的众所周知的子集。 这样,两个不同的clientId
可能会终止在同一线程上,但是同一clientId
将始终由同一线程处理。 如果出现同一clientId
两个事件,则它们都将被路由到同一线程,从而避免了并发处理。 实现非常简单:
class SmartPool implements EventConsumer, Closeable {private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;List<ExecutorService> list = IntStream.range(0, size).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);}@Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}@Overridepublic Event consume(Event event) {final int threadIdx = event.getClientId() % threadPools.size();final ExecutorService executor = threadPools.get(threadIdx);executor.submit(() -> downstream.consume(event));return event;}
}
关键部分就在最后:
int threadIdx = event.getClientId() % threadPools.size();
ExecutorService executor = threadPools.get(threadIdx);
这个简单的算法将始终对相同的clientId
使用相同的单线程ExecutorService
。 不同的ID可在同一池中结束,例如,当池大小是20
,客户机7
, 27
, 47
等,将使用相同的线程。 但这可以,只要一个clientId
始终使用同一线程即可。 此时,不需要锁定,并且可以保证顺序调用,因为同一客户端的事件始终由同一线程执行。 旁注:每个clientId
一个线程无法扩展,但是每个clientId
一个角色(例如,在Akka中)是一个很好的主意,它可以简化很多工作。
为了更加安全,我在每个线程池中插入了平均队列大小的指标,从而使实现更长:
class SmartPool implements EventConsumer, Closeable {private final List<LinkedBlockingQueue<Runnable>> queues;private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;this.queues = IntStream.range(0, size).mapToObj(i -> new LinkedBlockingQueue<Runnable>()).collect(Collectors.toList());List<ThreadPoolExecutor> list = queues.stream().map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);}private double averageQueueLength() {double totalLength =queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...}
如果您偏执狂,您甚至可以为每个队列创建一个指标。
重复数据删除和幂等
在分布式环境中,当生产者至少有一次保证时,接收重复事件是很常见的。 这种行为的原因不在本文讨论范围之内,但我们必须学习如何解决该问题。 一种方法是将全局唯一标识符( UUID
)附加到每条消息,并确保在消费者方面不会对具有相同标识符的消息进行两次处理。 每个Event
都有这样的UUID
。 根据我们的要求,最直接的解决方案是简单地存储所有可见的UUID
并在到达时验证接收到的UUID
从未见过。 按原样使用ConcurrentHashMap<UUID, UUID>
(JDK中没有ConcurrentHashSet
)会导致内存泄漏,因为随着时间的推移,我们将不断积累越来越多的ID。 这就是为什么我们仅在最近10秒内查找重复项。 从技术上讲,您可以拥有ConcurrentHashMap<UUID, Instant>
,当遇到该问题时,它会从UUID
映射到时间戳。 通过使用后台线程,我们可以删除10秒钟以上的元素。 但是,如果您是快乐的Guava用户,则具有声明性驱逐策略的Cache<UUID, UUID>
将达到目的:
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.UUID;
import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {return event;}}
}
为了保证生产安全,我至少可以想到两个指标可能会有用:缓存大小和发现的重复项数量。 让我们也插入以下指标:
class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {duplicates.mark();return event;}}
}
最终,我们拥有了构建解决方案的所有要素。 这个想法是由彼此包装的EventConsumer
实例组成管道:
- 首先,我们应用
IgnoreDuplicates
拒绝重复项 - 然后,我们调用
SmartPool
,它将始终将给定的clientId
到同一线程,并在该线程中执行下一阶段 - 最后,调用真正执行业务逻辑的
ClientProjection
。
您可以选择在SmartPool
和ClientProjection
之间放置FailOnConcurrentModification
步骤,以提高安全性(设计上不应进行并发修改):
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification =new FailOnConcurrentModification(clientProjection);
SmartPool smartPool =new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates =new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es = new EventStream();
es.consume(withoutDuplicates);
我们花了很多工作才能提出相对简单且结构合理的解决方案(我希望您同意)。 最后,解决并发问题的最佳方法是……避免并发,并在一个线程中运行受竞争条件约束的代码。 这也是Akka actor(每个actor处理单个消息)和RxJava( Subscriber
处理的一条消息)背后的思想。 在下一部分中,我们将在RxJava中看到声明式解决方案。
翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html
kata