最近项目里需要写模型,然后数据是从es取,不同人负责写不同模型,这里遇到一个问题,就是数据的读其实公用,没必要各自取一次数据浪费带宽、内存、cpu…遂想是否可以提取公用部分做数据读,下发给各个模型,然后开发老大说用disruptor试试,调研了下确实可行,就稍加记录下。
disruptor项目地址:https://github.com/LMAX-Exchange/disruptor
简单来说disruptor是一个高性能的异步的消息处理框架。
disruptor最核心的概念是ringbuffer(环形缓冲队列),下图解释所有↓

ringbuffer就如上一个环形队列,生产者往队列写数据,消费者读取数据处理,如果生产者追到消费者,那么生产者会停止生产,阻塞,等待它完成;同样的,消费者消费完了所有数据,消费者还未来得及消费,前者一样阻塞等待。
disruptor采用的就是ringbuffer数据结构,之所以采用这种结构,官网是这样描述的:
之所以采用ringbuffer,是因为它在可靠消息传递方面有很好的性能。这就够了,不过它还有一些其他的优点。
首先,因为它是数组,所以要比链表快,而且有一个容易预料到的数据访问的内存地址。这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,cpu无需时不时去主存加载数组中的下一个元素。(校对注:因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行)
其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。
其他概念就不详细描述了,直接照例代码上。
引入包
1 2 3 4 5
| <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.6</version> </dependency>
|
Event类获取用于填充数据,data类型按需定义。
1 2 3 4 5 6 7 8 9 10 11
| public class MessageEvent { private List<Map<String, Object>> data;
public void setData(List<Map<String, Object>> data) { this.data = data; }
public List<Map<String, Object>> getData() { return data; } }
|
数据工厂类,实现EventFactory接口
1 2 3 4 5 6
| public class MessageEventFactory implements EventFactory<MessageEvent> { @Override public MessageEvent newInstance() { return new MessageEvent(); } }
|
生产者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MessageEventProducer { private static Logger logger = LoggerFactory.getLogger(MessageEventProducer.class);
private final RingBuffer<MessageEvent> ringBuffer;
public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) { this.ringBuffer = ringBuffer; }
public void onData(List<Map<String, Object>> datas) { long sequence = ringBuffer.next(); try { MessageEvent event = ringBuffer.get(sequence); event.setData(datas); } finally { ringBuffer.publish(sequence); } } }
|
消费者类,handler,具体逻辑数据类,实现EventHandler接口即可,实现其onEvent方法,可通过event.getData()拿到数据。
1 2 3 4 5 6 7
| public class MessageEventHandler implements EventHandler<MessageEvent> { public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); } }
|
主类,启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class MessageEventMain extends Thread { private static Logger logger = LoggerFactory.getLogger(MessageEventMain.class);
@Override public void run() { ThreadFactory threadFactory = Executors.defaultThreadFactory();
MessageEventFactory factory = new MessageEventFactory();
int bufferSize = 1024;
Disruptor<MessageEvent> disruptor = new Disruptor<>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy());
disruptor.handleEventsWith(new MessageHandler()); disruptor.handleEventsWith(new xxxxHandler());
disruptor.start();
RingBuffer<MessageEvent> ringBuffer = disruptor.getRingBuffer(); MessageEventProducer producer = new MessageEventProducer(ringBuffer); while (true) { try { List list = getData(); producer.onData(list); Thread.sleep(60 * 10 * 1000); } catch (Exception e) { e.printStackTrace(); } } } }
|
打完收工
另外讲几点:
1.handler会在producer.onData后对框架通知消费数据;
2.消费数据最好按最简单类型处理,性能更好,我上面用list实在是业务框定,所有后续造成了一个严重问题;
3.handler消费完对应ringbuffer数据并不会马上被清理,是直到生产者填充环形队列再到这个位置才会覆盖数据,所以问题2就是这样出现,如果定义复杂类型,一格存储大量数据,会操作jvm内存不够,程序运行一段数据会出现大量full fc或者oom,解决办法是,要么换成一条条数据发送,要么减小ringbuffer size,我这里业务限定,只能改小了ringbuffer size;
4.框架有阻塞等待机制,具体使用算法可以自行查,也就是说,如果某一个消费者太慢,生产者太快,到生产者追上消费者的那一格,生产者会阻塞,直到最慢的那个消费者消费完成,同样的,如果消费者很快,消费到了最新的一份数据,一样会阻塞等待生成者再次生成数据。所以这里要注意,如果某一个handler一直太慢,你可能要处理下这个handler效率,必要时撤掉,起码不能影响其他handler正常运行。