本文由云 + 社区发表

作者:皮皮熊

概述

Apache Flume 是一个用于高效地从大量异构数据源收集、聚合、传输到一个集中式数据存储的分布式、高可靠、高可用的系统。

Apache Flume 是 Apache 基金会的顶级项目。现在有两个代码版本线可以获取:0.9.x 和 1.x。本文档对应的是 1.x 版本。

数据流模型

Event 是流经 flume agent 的最小数据单元。一个 Event(由 Event 接口实现) 从 source 流向 channel,再到 sink。Event 包含了一个 payload(byte array) 和可选的 header(string attributes)。一个 flume agent 就是一个 jvm 下的进程:控制着 Events 从一个外部的源头到一个外部的目的地。

Source 消费着具有特殊格式的 Events(这些 Event 传递到 Source 通过像 Web server 这样外在的数据源)。例如 AvroSource 可以被用于接收 Avro 的 Events,从本客户端或者其他运行中的 flume 客户端。当一个 Source 接收到一个 Event,它会把它插入到一个或者多个 Channel 里。Channel 会被动地存储这些 Event 直到它们被一个 Sink 消费到。Flume 中一种 Channel 是 FileChannel,其使用文件系统来作为后端存储。Sink 需要负责任地将一个 Event 从 Channel 中移除,并将其放入像 hdfs 一样的外部存储系统 (例如 HDFSEventSink),或者转发到传输中下一个节点的 source 中。Source 和 Sink 在 agent 中异步地交互 Channel 中的 Event。

可靠性

Event 是存储在 Flume agent 的 Channel 里。Sink 的责任就是传输 Event 到下一个 agent 或者最终的存储系统 (像 hdfs)。Sink 只有当 Event 写入下一个 agent 的 Channel 或者 存储到最终的系统时才会从 channel 里面删掉 Event。这就是 Flume 如何在单跳消息传输中提供端到端的可靠性。Flume 提供了一个事务性的方法来修复可靠传输中的 Event。Source 和 Sink 包含了 Event 的存储和重试 (通过由 channel 提供的事务)。

构建 Flume

获取源码

通过 git

编译/测试 Flume

Flume 使用 maven 来 build。你可以通过标准的 maven 命令行来编译 Flume。

  1. 仅编译:mvn clean compile
  2. 编译且运行单元测试:mvn clean test
  3. 运行独立的测试:mvn clean test -Dtest=,,... -DfailIfNoTests=false
  4. 打包:mvn clean install
  5. 打包 (忽略单元测试):mvn clean install -DskipTests

注意:Flume build 需要在 path 中有 Google Protocol Buffers 编译器。

更新 Protocol Buffer 版本

File channel 依赖 Protocol Buffer。当你想更新 Protocol Buffer 版本时,你需要如下更新使用到 Protocol Buffer 的 data access 类:

  1. 本机安装你想要的 PB 版本
  2. 更新 pom.xml 中 PB 的版本
  3. 生成 flume 中新的 PB data access 类:cd flume-ng-channels/flume-file-channel; mvn -P compile-proto clean package -DskipTests
  4. 在所有生成文件中加上 Apache license(如果缺了的话)
  5. rebuild 及测试 Flume:cd ../..; mvn clean install

开发自定义部分

client

Client 在 Event 产生时运转,并将他们传递到 Flume 的 agent。Client 通常运行在应用消费数据的进程空间中。Flume 目前支持 Avro, log4j, syslog, 以及 Http POST (with a JSON body) 方式从外部数据源传输数据。同时 ExecSource 支持将本地进程的输出作为 Flume 的输入。

可能已有的方案是不够的。本案例中你可以使用自定义的方法来向 flume 发送数据。这里有两种方法来实现。第一:写一个自定义的客户端来和 flume 已有的 source 交互,像 AvroSource 或者 SyslogTcpSource。此时 Client 需要将数据转换成这些 Source 能理解的 message。另外一个方案:写一个自定义的 Flume Source,通过 IPC 或者 RPC,直接地和已有的 client 应用通信 (需要将 client 的数据转换成 Flume 的 Event)。注意这些存储在 flume agent channel 中的事件,必须以 Flume Event 形式存在。

Client SDK

尽管 Flume 包含了一系列内置的,用于接收数据的方法 (即 Source),人们常常想直接地通过 flume 和自定义的程序进行通信。Flume SDK 就是这样一个 lib,它可以通过 RPC 直接地连接到 Flume,并且发送到 Flume 的数据流。

RPC 客户端接口

一个 RPC 客户端接口的实现,包含了支持 Flume 的 RPC 方法。用户的程序可以简单地调用 Flume SDK 客户端的 append(Event) 或者 appendBatch(List) 接口来发送数据,而不用考虑消息交互的细节。用户可以通过使用诸如 SimpleEvent 类,或者使用 EventBuilder 的 静态 helper 方法 withBody(),便捷地实现直接提供事件接口所需的事件 ARG。

Transaction(事务) 接口

Transaction 接口是 Flume 可靠性的基础。所有主要组件(即 source,sink 和 channel)必须使用 Flume Transaction。

Transaction 在 channel 的实现中实现。每个 source 和 sink 连接到 channel 时必须要得到一个 channnel 的对象。Source 使用 channnelprocessor 来管理 transaction。sink 明确地通过他们配置的 channel 来管理 transaction。存储一个事件 (把他们放入 channnel 中) 或者抽取一个事件 (从 channnel 中取出) 在一个激活的 transaction 中完成。例如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

在这里,我们从 channel 获取 transaction。在 begin()返回后,Transaction 现在处于活动/打开状态,然后将 Event 放入 Channel 中。如果 put 成功,则提交并关闭 Transaction。

Sink

Sink 的目的就是从 Channel 中提取事件并将其转发到传输中的下一个 Flume Agent 或将它们存储在外部存储库中。根据 Flume 属性文件中的配置,接收器只与一个通道关联。每个已配置的 Sink 都有一个 SinkRunner 实例,当 Flume 框架调用 SinkRunner.start()时,会创建一个新线程来驱动 Sink(使用 SinkRunner.PollingRunner 作为线程的 Runnable),该线程管理 Sink 的生命周期。Sink 需要实现 start()和 stop()方法作为 LifecycleAware 接口的一部分。

Sink 实现还需要实现 Configurable 接口来处理自己的配置设置。例如:

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}

Source

Source 的目的是从外部客户端接收数据并将其存储到已配置的 Channels 中。Source 可以获取其自己的 ChannelProcessor 的实例来处理在 Channel 本地事务中提交的串行事件。在 exception 的情况下,需要 Channels 传播异常,则所有 Channels 将回滚其事务,但先前在其他 Channel 上处理的事件将保持提交。

与 SinkRunner.PollingRunner Runnable 类似,有一个 PollingRunner Runnable,它在 Flume 框架调用 PollableSourceRunner.start()时创建的线程上执行。每个配置的 PollableSource 都与自己运行 PollingRunner 的线程相关联。该线程管理 PollableSource 的生命周期,例如启动和停止。

注意,实际上有两种类型的 Source:已经提到过 PollableSource,另一个是 EventDrivenSource。与 PollableSource 不同,EventDrivenSource 必须有自己的回调机制,捕获新数据并将其存储到 Channel 中。EventDrivenSources 并不像 PollableSources 那样由它们自己的线程驱动。下面是一个自定义 PollableSource 的示例:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

参考自(Flume 1.8.0 Developer Guide)

flume 1.8.0 文档完整翻译可见 https://blog.csdn.net/u013128262

此文已由腾讯云 + 社区在各渠道发布

获取更多新鲜技术干货,可以关注我们腾讯云技术社区 - 云加社区官方号及知乎机构号


↙↙↙阅读原文可查看相关链接,并与作者交流