By
wz2cool
更新日期:
项目地址:https://github.com/wz2cool/local-queue
简介
一直以来我们都可以比较方便时用 jvm 队列比如 LinkedBlockingQueue 做一些缓存, 或者我们想削峰,远程调用使用的是 kafka 或者 rocketmq,这都是非常有用的做法,但是我们可能也会遇到一些问题,
对于 jvm 队列
- 如果内存队列是无界队列可能会有内存溢出(out-of-memory)的风险
- 一但程序关闭,所有在内存中的数据都会丢失,如果没有及时持久化的话数据就没了
- 没有位置信息,无法做消费位置移动。
对于 Kafka/rocketmq
- 受到网络的影响,如果网络延迟可能会导致消息阻塞的和延迟
- 对于特别小的及时消息对于 kafka 不友好,kafka 目标是大吞吐,希望是攒批消息
- 受到目标中间件高可用的影响,一但中间件不可用可能会存在丢消息情况
基于上述情况,我们可以看一下 local-queue,本项目基于 Chronicle 框架封装, Chronicle 是一个微妙级别延迟的框架,并且底层使用的是 mmap 技术,所以如果了解 kafka/rocketmq 会非常熟悉,这也使得 local-queue 在延迟上和性能上得到了基础保障。
功能
- 一写多读
- 保留消费者消费位置信息,可以断点续读
- 移动消费位点,从新的消费位点续读
- IPC 进程间通信(同一台机器两个 jvm 应用)
案例
一写多读
多个消费者可以读取同一份消息,消费者之间不同使用的是 consumerId 的区分,默认情况下消费者都是从最新的位置开始读,如果从最头部开始读会导致瞬发的 IO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Test public void testMultiConsumers() throws InterruptedException { try (SimpleQueue queue = new SimpleQueue(config)) { Thread consumer1Thread = new Thread(() -> { IConsumer consumer1 = queue.getConsumer("consumer1"); try { QueueMessage take = consumer1.take(); assertEquals("testContent", take.getContent()); assertEquals("testMessageKey", take.getMessageKey()); System.out.println("consumer1 take message"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread consumer2Thread = new Thread(() -> { IConsumer consumer2 = queue.getConsumer("consumer2"); try { QueueMessage take = consumer2.take(); assertEquals("testContent", take.getContent()); assertEquals("testMessageKey", take.getMessageKey()); System.out.println("consumer2 take message"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); consumer1Thread.start(); consumer2Thread.start(); Thread.sleep(2000); queue.offer("testMessageKey", "testContent"); consumer1Thread.join(); consumer2Thread.join(); } }
|
断点续读
local-queue 支持定时把消费位置存到本地,一般有个 position.dat 文件,如果下一次同一个 consumerId 再开始读的时候会从 position.dat 文件中读取位置,并且从这个位置续读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Test public void testResumeConsumer() throws InterruptedException { try (SimpleQueue queue = new SimpleQueue(config)) { for (int i = 0; i < 10; i++) { String key = "key" + i; String content = "content" + i; queue.offer(key, content); } Thread.sleep(100); String consumerId = "consumer1"; try (IConsumer consumer1 = queue.getConsumer(consumerId, ConsumeFromWhere.FIRST)) { Thread.sleep(100); List<QueueMessage> queueMessages = consumer1.batchTake(5); assertEquals(5, queueMessages.size()); QueueMessage lastOne = queueMessages.get(queueMessages.size() - 1); assertEquals("key4", lastOne.getMessageKey()); consumer1.ack(queueMessages); Thread.sleep(100); } try (IConsumer consumer1 = queue.getConsumer(consumerId, ConsumeFromWhere.FIRST)) { Thread.sleep(100); List<QueueMessage> queueMessages = consumer1.batchTake(3); assertEquals(3, queueMessages.size()); QueueMessage lastOne = queueMessages.get(queueMessages.size() - 1); assertEquals("key7", lastOne.getMessageKey()); consumer1.ack(queueMessages); Thread.sleep(100); } } }
|
重置位点
重置位点应该对 mq 是很重要的一个功能,一般当我们出问题了,希望回放一下数据就需要这个功能
根据 position 重置 (推荐)
这个是性能非常高的一种重置位点方式,可以从 QueueMessage 获取 position, 以为很多操作都是异步的所以这里 demo 会使用 Thread.sleep 等一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Test public void testMoveToPosition() throws InterruptedException { try (SimpleQueue queue = new SimpleQueue(config)) { for (int i = 0; i < 10; i++) { String key = "key" + i; String content = "content" + i; queue.offer(key, content); Thread.sleep(100); } IConsumer consumer1 = queue.getConsumer("consumer1", ConsumeFromWhere.FIRST); Thread.sleep(100); List<QueueMessage> queueMessages = consumer1.batchTake(10); QueueMessage example = queueMessages.get(3); consumer1.ack(queueMessages); Thread.sleep(100); boolean moveToResult = consumer1.moveToPosition(example.getPosition()); assertTrue(moveToResult); QueueMessage takeMessage = consumer1.take(); assertEquals("key3", takeMessage.getMessageKey()); assertEquals(example.getPosition(), takeMessage.getPosition()); assertEquals(example.getMessageKey(), takeMessage.getMessageKey()); } }
|
根据时间戳重置(探索阶段)
通过位置重置位点是一个非常高效的做法,但是往往我们有时候不知道位点,我们想重置到某个时间点上,因为原生不支持,所以先用遍历的方法找到位置,可能会性能上会有些延迟,性能上应该还可以优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Test public void testMoveToTimestamp() throws InterruptedException { try (SimpleQueue queue = new SimpleQueue(config)) { for (int i = 0; i < 10; i++) { String key = "key" + i; String content = "content" + i; queue.offer(key, content); Thread.sleep(100); } IConsumer consumer1 = queue.getConsumer("consumer1", ConsumeFromWhere.FIRST); Thread.sleep(100); List<QueueMessage> queueMessages = consumer1.batchTake(10); QueueMessage example = queueMessages.get(3); consumer1.ack(queueMessages); Thread.sleep(100); long writeTime = example.getWriteTime(); boolean moveToResult = consumer1.moveToTimestamp(writeTime); assertTrue(moveToResult); QueueMessage takeMessage = consumer1.take(); assertEquals("key3", takeMessage.getMessageKey()); assertEquals(example.getPosition(), takeMessage.getPosition()); assertEquals(example.getMessageKey(), takeMessage.getMessageKey()); } }
|
本地 IPC 通信
local-queue 不光支持多线程操作,而且支持本地多进程操作,我们知道 mmap 就是进程间通信的一种。这里我们不能再使用 SimpleQueue 要使用比较底层的封装 SimpleProducer 和 SimpleConsumer, 和 kafka/rocketmq 类似,一个进程生产数据,另外一个进程消费数据。
生产者程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Program {
public static void main(String[] args) { SimpleProducerConfig config = new SimpleProducerConfig.Builder() .setDataDir(new File("./test")) .setKeepDays(7) .build();
try (SimpleProducer producer = new SimpleProducer(config)) { while (true) { String formattedDateTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); String message = "Produce message at: " + formattedDateTime; producer.offer(message); } } } }
|
消费者程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Program {
public static void main(String[] args) throws InterruptedException { SimpleConsumerConfig config = new SimpleConsumerConfig.Builder() .setDataDir(new File("./test")) .setPositionFile(new File("./test/myPosition.dat")) .setConsumerId("consumerId") .setConsumeFromWhere(ConsumeFromWhere.LAST) .build();
try (SimpleConsumer consumer = new SimpleConsumer(config)) { while (true) { QueueMessage message = consumer.take(); System.out.println("[receive:]" + message.getContent()); consumer.ack(message); } } } }
|
打印出来结果
1 2 3 4 5 6 7 8 9 10
| [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 [receive:]Produce message at: 2025-01-31T18:34:11.588 ...
|
结尾
local-queue 刚才完成了一个初版,还有很多问题需要优化,大家如果有问题欢迎提出