1、Pipeline定义
在Elasticsearch 5.0版本以后引入了ingest node,用于在文档被索引之前进行预处理。
Pipeline 定义了一系列按顺序执行的 processors, 一个 pipeline 由 description 和 processors两部分组成:
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "...",
"processors" : [ ... ]
}
2、Pipeline获取文档信息
2.1 pipeline中的 processors 可以获取到传入的文档的原始数据以及元数据,并进行读写操作。获取单个字段可以直接使用字段名:
{
"set": {
"field": "my_field",
"value": 582.1
}
}
或者使用_source前缀:
{
"set": {
"field": "_source.my_field",
"value": 582.1
}
}
2.2 pipeline同样可以访问文档中的元信息,_index、_type、_id、_routing等。
{
"set": {
"field": "_id",
"value": "1"
}
}
2.3 pipeline中还可以获取Ingest 的元信息,ingest的元数据信息是只存在与文档预处理期间,当文档被pipeline处理完成,ingest元信息也就消息了。
{
"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}
}
此处使用了{{}},这种方式同样可以作为模板获取文档内的字段,比如下面field_c的内容是field_a与field_b连接。
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
}
在pipeline中还支持使用动态字段,如下将service字段的值当做字段名,将code的值当做字段值:
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
3、使用Simulate Pipeline API
在定义了pipeline之后,可以使用Simulate Pipeline API 对文档进行预处理执行测试。
POST /_ingest/pipeline/my-pipeline-id/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
4、常用processors类型详解
具体详解请看官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ingest-processors.html
此处主要说明几个关键常用。
4.1 Append Processor
该Processor追加一个或者多个值到现存的数组字段中,如果字段本身是标量则将其转换成数组再添加,如果字段不存在则创建包含该值的数字。
{
"append": {
"field": "tags",
"value": ["production", "{{app}}", "{{owner}}"]
}
}
4.2 Convert Processor
该Processor可以转换当前正在处理的字段数据类型,比如将一个String转换成Integer,如果是数组,则数组中所有成员都会被转换。目前只支持:integer, long, float, double, string, boolean, and auto。
PUT _ingest/pipeline/my-pipeline-id
{
"description": "converts the content of the price field to an double",
"processors" : [
{
"convert" : {
"field" : "price",
"type": "double"
}
}
]
}
4.3 Date Index Name Processor
该 Processor 可以指定文档中的日期字段,将文档分配到符合指定时间格式的索引中,前提是按照官方提供的说明来进行使用。
PUT _ingest/pipeline/monthlyindex
{
"description": "monthly date-time index naming",
"processors" : [
{
"date_index_name" : {
"field" : "date1",
"index_name_prefix" : "my-index-",
"date_rounding" : "M"
}
}
]
}
PUT /my-index/_doc/1?pipeline=monthlyindex
{
"date1" : "2020-04-25T12:02:01.789Z"
}
{
"_index" : "my-index-2020-04-01",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 55,
"_primary_term" : 1
}
上面的不会放入myindex索引中,而是myindex-2016-04-01 索引中,该功能与template配合使用可以按日期进行索引。
4.4 Script Processor
该 Processor 是 Ingest 中功能最强大的Processor,利用Elasticsearch提供的脚本能力。
{
"script": {
"lang": "painless",
"source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c",
"params": {
"param_c": 10
}
}
}
4.5 Set Processor
该 Processor 用于指定字段的值,如果该字段存在时,则修改字段的值,如果该字段不存在,则新增字段并设置该字段的值。
{
"description" : "sets the value of count to 1"
"set": {
"field": "count",
"value": 1
}
}
该处理器也可以用其他字段的值动态赋值:
PUT _ingest/pipeline/set_os
{
"description": "sets the value of host.os.name from the field os",
"processors": [
{
"set": {
"field": "host.os.name",
"value": "{{os}}"
}
}
]
}
POST _ingest/pipeline/set_os/_simulate
{
"docs": [
{
"_source": {
"os": "Ubuntu"
}
}
]
}