druid Hadoop-based Batch Ingestion

服务器

浏览数:130

2020-6-21

背景

Kafka Indexing Service segements 生成规则是根据topic 的partitions决定,假设 topic 有12个partiontions ,查询粒度是 1小时,那么 1天最多产生的segements 数量 216,一个segements的大小官网建议 500-700 MB ,其中有些segment大小只有几十K,非常不合理。

合并

从官网提供的合并实例当时并未执行成功,最终经过尝试

{
  "type" : "index_hadoop",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig" : {
      "type" : "hadoop",
     "inputSpec":{
                "type":"dataSource",
                "ingestionSpec":{
                    "dataSource":"wikipedia",
                    "intervals":[
                        "2013-08-31/2013-09-01"
                    ]
                }
            },
    "tuningConfig" : {
      "type": "hadoop"
    }
  }
}
}

说明

 "inputSpec":{
                "type":"dataSource",
                "ingestionSpec":{
                    "dataSource":"wikipedia",
                    "intervals":[
                        "2013-08-31/2013-09-01"
                    ]
                }

设置Hadoop 任务工作目录,默认通过/tmp,如果临时目录可用空间比较小,则会导致任务无法正常执行

{
    "type":"index_hadoop",
    "spec":{
        "dataSchema":{
            "dataSource":"test",
            "parser":{
                "type":"hadoopyString",
                "parseSpec":{
                    "format":"json",
                    "timestampSpec":{
                        "column":"timeStamp",
                        "format":"auto"
                    },
                   "dimensionsSpec": {
                     "dimensions": [
                        "test_id",
                        "test_id"
                    ],
                    "dimensionExclusions": [
                        "timeStamp",
                        "value"
                    ]
                }
                }
            },
             "metricsSpec": [
            {
                "type": "count",
                "name": "count"
            }
        ],
            "granularitySpec":{
                "type":"uniform",
                "segmentGranularity":"MONTH",
                "queryGranularity": "HOUR",
                "intervals":[
                         "2017-12-01/2017-12-31"
                    ]
                
            }
        },
        "ioConfig":{
            "type":"hadoop",
            "inputSpec":{
                "type":"dataSource",
                "ingestionSpec":{
                    "dataSource":"test",
                    "intervals":[
                        "2017-12-01/2017-12-31"
                    ]
                }
            }
            
        },
		"tuningConfig":{
                "type":"hadoop",
                 "maxRowsInMemory":500000,
                 "partitionsSpec":{
                    "type":"hashed",
                    "targetPartitionSize":5000000
                },
                "numBackgroundPersistThreads":1,
                
                "jobProperties":{
                	"mapreduce.job.local.dir":"/home/ant/druid/druid-0.11.0/var/mapred",
                	"mapreduce.cluster.local.dir":"/home/ant/druid/druid-0.11.0/var/mapred",
                	"mapred.job.map.memory.mb":2300,
                	"mapreduce.reduce.memory.mb":2300
                
                }
               
            }
    }
}

这是对于加载的数据的说明。

提交

其它解决方案

druid 本身提供合并任务方式,但仍是建议,直接通过hadoop计算。

参考文章

http://druid.io/docs/latest/ingestion/batch-ingestion.html

http://druid.io/docs/latest/ingestion/update-existing-data.html

作者:极客编程111