扩展Envoy
Enovy进程中运行着一系列的Inbound/Outbound监听器,分别代理外部发往工作负载、工作负载发往外部的网络流量。监听器的核心是过滤器链 ,链中每个过滤器都能够控制流量的处理流程。
扩展Envoy的主要方式就是开发新的过滤器。过滤器分为两个类别:
- 网络过滤器(L3/L4),是Envoy网络连接处理的核心
- HTTP过滤器(L7),由特殊的网络过滤器HttpConnectionManager管理,专门处理HTTP1/HTTP2/gRPC请求
根据行为的不同,网络过滤器分为:
- 读过滤器,当Envoy从下游连接接收到流量时调用
- 写过滤器,当Envoy准备向下游连接发送流量时调用
- 读/写过滤器,在上述两种情况下均调用
由于网络过滤器操控套接字的原始字节(外加少量事件,例如TLS握手完毕、连接断开),因此它的接口比较简单。
每个过滤器都可以中止迭代流程,并在未来继续后续过滤器的迭代。这种中止/继续迭代的机制,让实现复杂的需求成为可能,例如调用限速服务,异步的根据调用结果决定是否继续迭代。
网络过滤器之间可以在同一个下游连接的上下文内共享一些静态或动态数据。
L4过滤器的接口非常简单,总共只有4个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class ReadFilter { public: /** * 当连接上的数据被读取时调用 * @param data 读取到的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onData(Buffer::Instance& data, bool end_stream) PURE; /** * 当新连接刚创建时调用,过滤器链的迭代可以被中止 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onNewConnection() PURE; /** * 初始化用于和过滤器管理器交互的读过滤器回调,过滤器被注册时,将被过滤器管理器调用一次 * 任何需要用到底层连接的构造,需要在此函数的回调中执行 * * IMPORTANT: 出站、复杂逻辑不要在此,放在onNewConnection() * */ virtual void initializeReadFilterCallbacks(ReadFilterCallbacks& callbacks) PURE; } |
1 2 3 4 5 6 7 8 9 |
class WriteFilter { public: /** * 当在此连接上发生数据写入时调用 * @param data 需要写入的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 */ virtual FilterStatus onWrite(Buffer::Instance& data, bool end_stream) PURE; }; |
1 |
class Filter : public WriteFilter, public ReadFilter {}; |
Envoy提供了一个过滤器的Demon项目。我们基于这个项目来入门过滤器的开发。
构建此项目之前,注意将Bazel升级到0.23以上,否则你可能遇到错误:bazel parameter 'ctx' has no default value ...
参考如下命令完成示例项目的构建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# 签出源码 git clone https://github.com/envoyproxy/envoy-filter-example.git git submodule update --init cd envoy-filter-example # 根据你的构建环境选择适当的bazel选项 # bazel需要到Google下载部分软件包源码,可能需要代理 bazel build -c dbg --copt "-DENVOY_IGNORE_GLIBCXX_USE_CXX11_ABI_ERROR=1" //:envoy # 运行Envoy单元测试 bazel test -c dbg --copt "-DENVOY_IGNORE_GLIBCXX_USE_CXX11_ABI_ERROR=1" @envoy//test/... # 运行集成测试 bazel test -c dbg --copt "-DENVOY_IGNORE_GLIBCXX_USE_CXX11_ABI_ERROR=1" //:echo2_integration_test |
示例项目实现了名为Echo2的网络读过滤器,此过滤器的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
#pragma once #include "envoy/network/filter.h" #include "common/common/logger.h" namespace Envoy { namespace Filter { // 实现接口 class Echo2 : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter> { public: Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; // 新连接到达后不做任何处理,继续调用下一个过滤器 Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; } // 初始化回调集 void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { read_callbacks_ = &callbacks; } private: Network::ReadFilterCallbacks* read_callbacks_{}; }; } // namespace Filter } // namespace Envoy namespace Envoy { namespace Filter { // 接收到下游发来的数据后,简单的记录日志 Network::FilterStatus Echo2::onData(Buffer::Instance& data, bool) { ENVOY_CONN_LOG(trace, "echo: got {} bytes", read_callbacks_->connection(), data.length()); // 并把收到的数据直接Echo给下游 read_callbacks_->connection().write(data, false); // 然后停止过滤器迭代,不调用它们 return Network::FilterStatus::StopIteration; } } // namespace Filter } // namespace Envoy |
每个过滤器都需要以一个独特的名称进行注册,否则Envoy无法知道它的存在,你也不能在配置文件中引用之。
Envoy过滤器的注册,一律通过模板化的静态变量Registry::RegisterFactory进行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
#include <string> #include "echo2.h" #include "envoy/registry/registry.h" #include "envoy/server/filter_config.h" namespace Envoy { namespace Server { namespace Configuration { class Echo2ConfigFactory : public NamedNetworkFilterConfigFactory { public: // 没有配置 Network::FilterFactoryCb createFilterFactoryFromProto(const Protobuf::Message&, FactoryContext&) override { // 过滤器工厂回调,初始化过滤器链时,Envoy会调用此方法 // 通常你会在这里实例化过滤器,并添加到过滤器管理器中 return [](Network::FilterManager& filter_manager) -> void { filter_manager.addReadFilter(Network::ReadFilterSharedPtr{new Filter::Echo2()}); }; } // 创建空白的过滤器配置Proto消息对象 // 任何过滤器的配置,均以不透明的google.protobuf.Struct类型传递,并被转换为JSON、解析,然后填充到此Proto对象 ProtobufTypes::MessagePtr createEmptyConfigProto() override { return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Empty()}; } // 过滤器的独特名称,很重要 std::string name() override { return "echo2"; } }; /** * 静态的注册 */ static Registry::RegisterFactory<Echo2ConfigFactory, NamedNetworkFilterConfigFactory> registered_; } } } |
Registry::RegisterFactory是一个模板,在这个例子中,它会创建一个Echo2ConfigFactory,并在调用FactoryRegistry<NamedNetworkFilterConfigFactory>::registerFactory时传入它:
1 2 3 4 5 6 7 |
template <class T, class Base> class RegisterFactory { public: RegisterFactory() { FactoryRegistry<Base>::registerFactory(instance_); } private: T instance_{}; }; |
可以使用如下的Envoy配置文件:
1 2 3 4 5 6 7 8 9 10 |
static_resources: listeners: name: listener_0 address: socket_address: address: 127.0.0.1 port_value: 15001 filter_chains: - filters: - name: echo2 |
启动Envoy的命令行参数:
1 |
bazel-bin/envoy -c echo2_server.yaml -l trace |
使用Telnet登陆,然后可以输入文字并回车,Envoy会回响你的输入:
1 |
telnet 127.0.0.1 15001 |
HTTP过滤器类似于网络过滤器,也是形成一个栈。HTTP过滤器栈由HttpConnectionManager管理,HttpConnectionManager是一个L4过滤器。
根据行为的不同,HTTP过滤器分为:
- 解码器(Decoder),当HTTP连接管理器解码请求流的一部分(头、体、尾)时调用
- 编码器(Encoder),当HTTP连接管理器准备编码响应流的一部分(头、体、尾)时调用
- 编解码器,在上述两种情况下均调用
需要注意,HTTP过滤器操作的对象是流,而不是连接:
- 对于HTTP1.1,在任意时间点每个连接上最多有一个流
- 对于HTTP2或者gRPC,实现了连接的多路复用,允许多个流同时依托于单个L4连接
HTTP过滤器接口屏蔽了L4协议的细节。和L4过滤器一样,HTTP过滤器也能够中止、继续过滤器迭代,各HTTP过滤器同样可以在同一个请求流的上下文内共享一些静态或动态数据。
L7过滤器的类图如下:
HTTP流编解码器公共的父接口:
1 2 3 4 5 6 7 8 9 10 |
class StreamFilterBase { public: /** * 当过滤器将要被销毁时调用,销毁可能在流正常结束后,或者因为RESET提前发生 * * 任何过滤器都应在此方法中确保,所有异步事件 —— 例如定时器、网络调用 —— 被清理干净 */ virtual void onDestroy() PURE; }; |
HTTP流解码器,负责处理下游发来的请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
class StreamDecoderFilter { public: /** * 处理已经被http_parser解析好的请求头 * @param headers 请求头的映射 * @param end_stream 提示当前流是否header-only的 * @return FilterHeadersStatus 用于确定是否继续迭代后续过滤器 */ virtual FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) PURE; /** * 处理已经被http_parser解析好的数据帧 * @param data 存放数据帧的缓冲区 * @param end_stream 提示当前是否最后一个数据帧 * @return FilterDataStatus 用于确定是否继续迭代后续过滤器 */ virtual FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) PURE; /** * 处理已经被http_parser解析好的请求尾,隐含end_stream = true * @param trailers supplies the decoded trailers. */ virtual FilterTrailersStatus decodeTrailers(HeaderMap& trailers) PURE; /** * 过滤器管理器调用此方法来初始化解码回调集 */ virtual void setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) PURE; }; |
HTTP流编码器,可以处理准备发给下游的响应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
class StreamEncoderFilter : public StreamFilterBase { public: /* * 当配置Envoy,让其代理(通常不会配置)Expect:100-continue请求, * 并且当前请求指定了Expect:100-continue时,会调用此方法 */ virtual FilterHeadersStatus encode100ContinueHeaders(HeaderMap& headers) PURE; /** * 处理响应头 */ virtual FilterHeadersStatus encodeHeaders(HeaderMap& headers, bool end_stream) PURE; /** * 处理响应体 */ virtual FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) PURE; /** * 处理响应尾,隐含end_stream = true */ virtual FilterTrailersStatus encodeTrailers(HeaderMap& trailers) PURE; /** * 处理元数据,新的元数据应该直接存入metadata_map * 不要通过StreamDecoderFilterCallbacks::encodeMetadata()来添加元数据 * */ virtual FilterMetadataStatus encodeMetadata(MetadataMap& metadata_map) PURE; /** * 滤器管理器调用此方法来初始化编码回调集 */ virtual void setEncoderFilterCallbacks(StreamEncoderFilterCallbacks& callbacks) PURE; }; |
1 |
class StreamFilter : public virtual StreamDecoderFilter, public virtual StreamEncoderFilter {}; |
Envoy提供的envoy-filter-example示例项目中也提供了一个HTTP过滤器, 其代码存放在http-filter-example目录下。这是一个解码过滤器,它会为下游请求添加一个请求头。
参考如下命令完成示例项目的构建:
1 |
bazel build -c dbg --copt "-DENVOY_IGNORE_GLIBCXX_USE_CXX11_ABI_ERROR=1" //http-filter-example:envoy |
过滤器配置、过滤器类的声明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
#pragma once #include <string> #include "envoy/server/filter_config.h" #include "http-filter-example/http_filter.pb.h" namespace Envoy { namespace Http { // 过滤器的配置对象 class HttpSampleDecoderFilterConfig { public: // 构造函数,配置对象的入参是Proto消息,sample::Decoder依据你写的Proto文件自动生成 HttpSampleDecoderFilterConfig(const sample::Decoder& proto_config); const std::string& key() const { return key_; } const std::string& val() const { return val_; } private: const std::string key_; const std::string val_; }; typedef std::shared_ptr<HttpSampleDecoderFilterConfig> HttpSampleDecoderFilterConfigSharedPtr; // 过滤器声明 class HttpSampleDecoderFilter : public StreamDecoderFilter { public: HttpSampleDecoderFilter(HttpSampleDecoderFilterConfigSharedPtr); ~HttpSampleDecoderFilter(); // 需要实现Http::StreamFilterBase的方法 void onDestroy() override; // 需要实现Http::StreamDecoderFilter的方法 FilterHeadersStatus decodeHeaders(HeaderMap&, bool) override; FilterDataStatus decodeData(Buffer::Instance&, bool) override; FilterTrailersStatus decodeTrailers(HeaderMap&) override; void setDecoderFilterCallbacks(StreamDecoderFilterCallbacks&) override; private: const HttpSampleDecoderFilterConfigSharedPtr config_; StreamDecoderFilterCallbacks* decoder_callbacks_; const LowerCaseString headerKey() const; const std::string headerValue() const; }; } // namespace Http } // namespace Envoy |
Proto文件:
1 2 3 4 5 6 7 8 9 10 |
syntax = "proto3"; package sample; import "validate/validate.proto"; message Decoder { string key = 1 [(validate.rules).string.min_bytes = 1]; string val = 2 [(validate.rules).string.min_bytes = 1]; } |
BUILD文件中的规则http_filter_proto负责从Proto文件生成C++代码。
过滤器的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
#include <string> #include "http_filter.h" #include "envoy/server/filter_config.h" namespace Envoy { namespace Http { // 配置对象的构造函数,入参是Proto对象 HttpSampleDecoderFilterConfig::HttpSampleDecoderFilterConfig(const sample::Decoder& proto_config) : key_(proto_config.key()), val_(proto_config.val()) {} // 过滤器的构造函数,入参是配置对象 HttpSampleDecoderFilter::HttpSampleDecoderFilter(HttpSampleDecoderFilterConfigSharedPtr config): config_(config) {} HttpSampleDecoderFilter::~HttpSampleDecoderFilter() {} void HttpSampleDecoderFilter::onDestroy() {} const LowerCaseString HttpSampleDecoderFilter::headerKey() const { return LowerCaseString(config_->key()); } const std::string HttpSampleDecoderFilter::headerValue() const { return config_->val(); } FilterHeadersStatus HttpSampleDecoderFilter::decodeHeaders(HeaderMap& headers, bool) { // 添加一个请求头 headers.addCopy(headerKey(), headerValue()); // 设置响应体 auto body_text = fmt::format("{}:{}", headerKey().get(), headerValue().c_str()); // 添加一个响应头 auto modify_headers = [this](HeaderMap& headers) -> void { headers.addCopy(headerKey(), headerValue()); }; decoder_callbacks_->sendLocalReply(Http::Code::OK, body_text, modify_headers, absl::nullopt); // 本地响应已经发送,必须停止迭代,否则执行到Router会出现断言失败 return FilterHeadersStatus::StopIteration; } // 如果不增加任何逻辑,简单返回Continue即可 FilterDataStatus HttpSampleDecoderFilter::decodeData(Buffer::Instance&, bool) { return FilterDataStatus::Continue; } FilterTrailersStatus HttpSampleDecoderFilter::decodeTrailers(HeaderMap&) { return FilterTrailersStatus::Continue; } void HttpSampleDecoderFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) { decoder_callbacks_ = &callbacks; } } // namespace Http } // namespace Envoy |
配置工厂,能够创建过滤器工厂,过滤器工厂FilterFactoryCb本质上就是一个函数,Envoy调用它来创建过滤器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
#include <string> #include "http_filter.h" #include "common/config/json_utility.h" #include "envoy/registry/registry.h" #include "http-filter-example/http_filter.pb.h" #include "http-filter-example/http_filter.pb.validate.h" namespace Envoy { namespace Server { namespace Configuration { class HttpSampleDecoderFilterConfigFactory : public NamedHttpFilterConfigFactory { public: // 从JSON配置创建过滤器工厂 Http::FilterFactoryCb createFilterFactory(const Json::Object& json_config, const std::string&, FactoryContext& context) override { sample::Decoder proto_config; // 将JSON配置转化为Proto配置 translateHttpSampleDecoderFilter(json_config, proto_config); return createFilter(proto_config, context); } // V2 API的createFilterFactory变体,过滤器配置以Proto消息的形式传入,目前可以不实现此方法 // 未来V1 API废弃后,必须实现 Http::FilterFactoryCb createFilterFactoryFromProto(const Protobuf::Message& proto_config, const std::string&, FactoryContext& context) override { return createFilter(Envoy::MessageUtil::downcastAndValidate<const sample::Decoder&>(proto_config), context); } // 新的空白配置Proto ProtobufTypes::MessagePtr createEmptyConfigProto() override { return ProtobufTypes::MessagePtr{new sample::Decoder()}; } // 独特的名称 std::string name() override { return "sample"; } private: Http::FilterFactoryCb createFilter(const sample::Decoder& proto_config, FactoryContext&) { // 将Proto对象转化为配置对象 Http::HttpSampleDecoderFilterConfigSharedPtr config = std::make_shared<Http::HttpSampleDecoderFilterConfig>( Http::HttpSampleDecoderFilterConfig(proto_config)); return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void { auto filter = new Http::HttpSampleDecoderFilter(config); // 添加此过滤器,注意L7过滤器由过滤器链工厂管理,每个监听器可以有多个过滤器链 callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr{filter}); }; } void translateHttpSampleDecoderFilter(const Json::Object& json_config, sample::Decoder& proto_config) { JSON_UTIL_SET_STRING(json_config, proto_config, key); JSON_UTIL_SET_STRING(json_config, proto_config, val); } }; // 静态注册,类似于L4过滤器 static Registry::RegisterFactory<HttpSampleDecoderFilterConfigFactory, NamedHttpFilterConfigFactory> register_; } // namespace Configuration } // namespace Server } // namespace Envoy |
可以使用如下的Envoy配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
static_resources: listeners: name: listener_0 address: socket_address: address: 127.0.0.1 port_value: 15001 filter_chains: - filters: - name: envoy.http_connection_manager config: stat_prefix: sample route_config: name: gmem virtual_hosts: - name: gmem domains: ["*"] routes: - match: prefix: "/" route: cluster: gmem http_filters: - name: sample config: key: via val: sample-filter - name: envoy.router clusters: - name: gmem connect_timeout: 1s type: STRICT_DNS dns_lookup_family: V4_ONLY lb_policy: ROUND_ROBIN hosts: - socket_address: address: gmem.cc port_value: 80 |
使用curl来测试:
1 2 3 4 5 6 7 8 9 10 11 12 |
curl -D - http://127.0.0.1:15001 HTTP/1.1 200 OK content-length: 17 content-type: text/plain # 添加的响应头 via: sample-filter date: Tue, 30 Apr 2019 03:44:45 GMT server: envoy # 设置的响应体 via:sample-filter |
Leave a Reply