FunTester Aeron 框架初探

FunTester · 2024年12月05日 · 2095 次阅读

Aeron 是什么

Aeron 是一款开源的高性能消息传递框架,专为低延迟和高吞吐场景设计。它被广泛应用于金融、游戏、分布式系统等需要快速通信的领域。Aeron 的核心优势在于通过零拷贝技术和直接内存访问,最大限度地降低消息传递的延迟,同时利用高效的网络协议实现数据的可靠传输。

其架构包括媒体驱动器和客户端 API,支持单播、广播和进程间通信。Aeron 还提供持久化模块(Aeron Archive),便于消息流的存储与回放,满足日志重放等需求。

Aeron 支持 Java、C++、C# 等多语言,适配多种场景,如分布式日志复制、高频交易等。开发者可灵活选择嵌入式或独立部署模式,以满足不同应用需求。凭借极低延迟、强稳定性和简洁的编程模型,Aeron 成为高性能通信领域的佼佼者,是构建实时系统的理想选择。

Aeron 的优势

选择 Aeron 的理由在于其专注于极致性能和低延迟的优化。相比传统消息中间件如 Kafka 和 RabbitMQ,Aeron 的零拷贝技术和直接内存操作将延迟降至微秒级,满足高频交易、实时数据分发等对响应速度极为敏感的场景。此外,Aeron 通过灵活的单播、多播及进程间通信模式,提供更简单的部署方式和开发体验,避免了传统中间件依赖复杂分布式管理工具的麻烦。

与 Kafka 注重吞吐和持久化,RabbitMQ 强调功能丰富不同,Aeron 更倾向于实时性需求,专为高性能任务而生。它不仅能稳定处理高并发流量,还具备消息回放功能,兼顾了性能与可靠性,成为构建实时应用的优选。

基础概念

Aeron 架构组成

Media Driver:Media Driver 是 Aeron 的通信核心,负责底层数据的传输处理,包括进程内(IPC)和网络间(UDP)的通信操作。它利用直接内存和内存映射文件,实现高效的消息传递,同时支持单播和多播模式,满足不同场景的需求。

Client API:Client API 是 Aeron 面向开发者的高级接口,简化了消息发布和订阅的操作流程。通过它,开发者可以轻松创建 Publication 和 Subscription,专注于业务逻辑,而无需处理复杂的底层实现。

Log Buffer:Log Buffer 是共享内存区域,用于存储发布的消息数据,供订阅者访问。它基于环形缓冲区设计,确保高效的消息写入和读取,同时支持流量控制和重传功能。

核心术语

  • Publication 和 Subscription:Publication 是消息的发布端,负责将数据写入 Log Buffer;Subscription 是消息的订阅端,从 Log Buffer 读取数据,形成 Aeron 的发布 - 订阅模型核心。
  • Channel:Channel 是消息通信的介质,指定消息的传输方式,例如进程内(IPC)或跨主机的网络通信(UDP)。它是连接发布者和订阅者的桥梁。
  • Stream ID:Stream ID 用于标识同一 Channel 内的不同逻辑消息流,允许多个独立的数据流共用一个通信通道,提高资源利用率。
  • Session ID:Session ID 用于区分同一 Stream ID 内不同发布者的会话。即使多个发布者共享一个逻辑流,每个会话的数据都能独立追踪。

    传输方式

  • IPC:IPC(进程间通信)是 Aeron 的最快通信模式,依托共享内存直接传输消息,适合部署在同一主机内的高性能应用场景。

  • UDP:UDP 支持主机间的网络通信,是 Aeron 的跨网络传输方式。它通过单播或多播发送数据,保证了低延迟和高扩展性,适合分布式环境。

code 实践

创建 MediaDriver

创建服务端比较简单,如下:

// 创建并启动不带存档功能的
MediaDriverMediaDriver mediaDriver = MediaDriver.launch();  
System.out.println("Aeron Archive Server is running...");

在我查询资料的过程中,有一些教程会在这行代码后面加上无限休眠来保障 MediaDriver 的运行。在我实际测试当中,并不会发生代码执行完就终止的情况,可能是早期版本的设计差异导致,各位在使用当中可以以实际测试结果为准。

创建 Publisher

相对来说复杂一些,但是照着官方的教程还是比较容易写个 Demo 出来的。PS:官方教程会将各个步骤单独拿出来演示,并说明作用。没有耐心的可以直接源码仓库找 Demo。

import io.aeron.Aeron;  
import io.aeron.ConcurrentPublication;  
import io.aeron.driver.MediaDriver;  
import org.agrona.concurrent.IdleStrategy;  
import org.agrona.concurrent.SleepingIdleStrategy;  
import org.agrona.concurrent.UnsafeBuffer;  

public class ArchivePublisher {  

    public static void main(String[] args) throws InterruptedException {  
        // 创建并启动不带存档功能的 MediaDriver        MediaDriver mediaDriver = MediaDriver.launch();  
        System.out.println("Aeron Archive Server is running...");  
        // 创建一个空闲策略,用于在没有数据时进行空闲  
        IdleStrategy idleStrategy = new SleepingIdleStrategy();  
        // 创建Aeron上下文  
        Aeron.Context ctx = new Aeron.Context();  
        // 创建Aeron实例  
        Aeron aeron = Aeron.connect(ctx);  
        // 声明 channel 和 streamId        String channel = "aeron:udp?endpoint=localhost:40123";  
        int streamId = 10;  
        ConcurrentPublication publication = aeron.addPublication(channel, streamId);// 创建独占发布者,作用是确保只有一个发布者,避免多个发布者同时发布数据  
        // 等待发布者连接  
        while (!publication.isConnected()) {  
            idleStrategy.idle();  
        }  
        System.out.println("Publication connected");  
        for (int i = 0; i < 10000; i++) {  
            Thread.sleep(1000);  
            String s = "Hello World! From SDET!" + i;  
            byte[] bytes = s.getBytes();  
            UnsafeBuffer buffer = new UnsafeBuffer(bytes);  
            while (publication.offer(buffer) < 0) {  
                idleStrategy.idle();  
            }  
            System.out.println("Published message: " + s);  
        }  
    }  

}

创建 Subscription

同上,建议大家去直接官方源码中获取完成的 Demo。

import io.aeron.Aeron;  
import io.aeron.Subscription;  
import org.agrona.concurrent.BackoffIdleStrategy;  
import org.agrona.concurrent.IdleStrategy;  

public class AeronSubscription {  
    public static void main(String[] args) {  
        // 创建Aeron上下文  
        Aeron.Context aeronCtx = new Aeron.Context();  
        // 创建Aeron实例  
        Aeron aeron = Aeron.connect(aeronCtx);  
        // 创建订阅,并指定channel和streamId  
        Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:40123", 10);  
        // 创建空闲策略  
        IdleStrategy idleStrategy = new BackoffIdleStrategy();  
        while (true) {  
            // 从订阅中获取数据  
            int fragments = subscription.poll((buffer, offset, length, header) -> {  
                // 创建一个字节数组,用于存放数据  
                byte[] data = new byte[length];  
                buffer.getBytes(offset, data);// 将数据从buffer中读取到data中  
                System.out.println(buffer);// 打印buffer  
                System.out.println(offset);// 打印offset  
                System.out.println(header.position());// 打印position  
                System.out.println(header.termId());// 打印termId  
                System.out.println(header.sessionId());// 打印sessionId  
                System.out.println("Received message: " + new String(data));  
                System.out.println("------------------------");  
            }, 10);  
            idleStrategy.idle(fragments);  
        }  
    }  
}

Aeron 性能为何这么高

Aeron 的高性能得益于多个设计上的优化,特别是在内存管理、网络传输和延迟控制等方面。以下是其主要的性能优势:

零拷贝内存管理

Aeron 的一个关键特性是采用了零拷贝 (Zero-Copy) 技术,特别是在数据传输和存储中。数据通过直接在内存中传输,而不是进行昂贵的拷贝操作,避免了 CPU 和内存之间不必要的数据移动。通过 Direct Memory,Aeron 可以直接从用户空间写入到内核缓冲区,减少了数据复制和上下文切换的开销。

高效的共享内存模型

Aeron 使用了 共享内存 (Shared Memory) 模型来进行进程间通信。这种设计使得多个进程能够直接读写共享的内存区域,而不需要经过传统的操作系统缓冲区或磁盘文件。这种方式显著降低了延迟,提高了数据传输速度。对于同一台机器上的进程间通信(IPC),Aeron 通过共享内存提供几乎为零的延迟。

面向流的架构

Aeron 将通信设计成流(Stream)模型,通过 Stream ID 来标识不同的数据流。每个流都拥有独立的缓冲区和流控制机制,这种设计提高了消息的传输效率,并降低了多个流之间的相互干扰。每个流的消息都能独立处理,优化了吞吐量和并发性能。

低延迟网络协议

Aeron 支持 UDP 协议,允许在不同机器间进行快速数据传输。相较于基于 TCP 的传统消息中间件,UDP 更轻量,减少了协议栈中的处理步骤,进一步降低了延迟。同时,Aeron 内部实现了自己的 flow control 和 error recovery 机制,确保即使在高负载的情况下也能保证消息的有序和可靠传输。

高效的传输机制

Aeron 最大程度地减少了上下文切换、内存分配和同步锁等开销。它通过使用单线程模型来减少多线程上下文切换带来的开销,同时利用现代硬件的多核架构进行高效的数据处理。

在 Aeron 中,发布者(Publication)和订阅者(Subscription)的设计使得消息可以以高效的方式传递。在发布和订阅的过程中,Aeron 通过直接内存映射和环形缓冲区实现了低延迟的数据传递,不会像传统的消息中间件那样存在大量的队列或缓冲区复制。

FunTester 原创精华
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册