在快速开始中,我们演示了接入本地示例数据方式,但druid其实支持非常丰富的数据接入方式。比如批处理数据的接入和实时流数据的接入。本文我们将介绍这几种数据接入方式。
本文主要介绍前两种最常用的数据接入方式。
druid提供以下几种方式加载数据:
通过页面数据加载器
通过控制台
通过命令行
通过curl命令调用
druid提供了一个示例数据文件,其中包含2015年9月12日发生的wiki的示例数据。
此样本数据位于quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz
示例数据大概是这样:
{ "timestamp":"2015-09-12t20:03:45.018z", "channel":"#en.wikipedia", "namespace":"main", "page":"spider-man's powers and equipment", "user":"foobar", "comment":"/* artificial web-shooters */", "cityname":"new york", "regionname":"new york", "regionisocode":"ny", "countryname":"united states", "countryisocode":"us", "isanonymous":false, "isnew":false, "isminor":false, "isrobot":false, "isunpatrolled":false, "added":99, "delta":99, "deleted":0, }
druid加载数据分为以下几种:
我们这样演示一下加载示例文件数据
base directory输入quickstart/tutorial/
file filter输入 wikiticker-2015-09-12-sampled.json.gz
然后点击apply预览 就可以看见数据了 点击next:parse data解析数据
可以看到json数据已经被解析了 继续解析时间
解析时间成功 之后两步是transform和filter 这里不做演示了 直接next
这一步会让我们确认schema 可以做一些修改
由于数据量较小 我们直接关掉rollup 直接下一步
这里可以设置数据分段 我们选择hour next
等待任务成功
选择datasources 可以看到我们加载的数据
可以看到数据源名称 fully是完全可用 还有大小等各种信息
点击query按钮
我们可以写sql查询数据了 还可以将数据下载
在任务视图中,单击submit json task
这将打开规格提交对话框,粘贴规范
{ "type" : "index_parallel", "spec" : { "dataschema" : { "datasource" : "wikipedia", "dimensionsspec" : { "dimensions" : [ "channel", "cityname", "comment", "countryisocode", "countryname", "isanonymous", "isminor", "isnew", "isrobot", "isunpatrolled", "metrocode", "namespace", "page", "regionisocode", "regionname", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "timestampspec": { "column": "time", "format": "iso" }, "metricsspec" : [], "granularityspec" : { "type" : "uniform", "segmentgranularity" : "day", "querygranularity" : "none", "intervals" : ["2015-09-12/2015-09-13"], "rollup" : false } }, "ioconfig" : { "type" : "index_parallel", "inputsource" : { "type" : "local", "basedir" : "quickstart/tutorial/", "filter" : "wikiticker-2015-09-12-sampled.json.gz" }, "inputformat" : { "type": "json" }, "appendtoexisting" : false }, "tuningconfig" : { "type" : "index_parallel", "maxrowspersegment" : 5000000, "maxrowsinmemory" : 25000 } } }
查看加载任务即可。
为了方便起见,druid提供了一个加载数据的脚本
bin/post-index-task
我们可以运行命令
bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
看到如下输出:
beginning indexing data for wikipedia task started: index_wikipedia_2018-07-27t06:37:44.323z task log: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27t06:37:44.323z/log task status: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27t06:37:44.323z/status task index_wikipedia_2018-07-27t06:37:44.323z still running... task index_wikipedia_2018-07-27t06:37:44.323z still running... task finished with status: success completed indexing data for wikipedia. now loading indexed data onto the cluster... wikipedia loading complete! you may now query your data
查看加载任务即可。
我们可以通过直接调用curl来加载数据
curl -x 'post' -h 'content-type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task
提交成功
{"task":"index_wikipedia_2018-06-09t21:30:32.802z"}
apache kafka是一个高性能的消息系统,由scala 写成。是由apache 软件基金会开发的一个开源消息系统项目。
kafka 最初是由linkedin 开发,并于2011 年初开源。2012 年10 月从apache incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。
更多kafka相关请查看kafka入门宝典(详细截图版)
我们安装一个最新的kafka
curl -o https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz tar -xzf kafka_2.12-2.1.0.tgz cd kafka_2.12-2.1.0
启动kafka
./bin/kafka-server-start.sh config/server.properties
创建一个topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
向kafka的topic为wikipedia写入数据
cd quickstart/tutorial gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
在kafka目录中运行命令 {path_to_druid}替换为druid目录
export kafka_opts="-dfile.encoding=utf-8" ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {path_to_druid}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
druid加载kafka的数据也有多种方式
选择apache kafka
并单击connect data
localhost:9092
2.3.1.4 解析时间戳 设置转换 设置过滤
在任务视图中,单击submit json supervisor
以打开对话框。
粘贴进去如下指令
{ "type": "kafka", "spec" : { "dataschema": { "datasource": "wikipedia", "timestampspec": { "column": "time", "format": "auto" }, "dimensionsspec": { "dimensions": [ "channel", "cityname", "comment", "countryisocode", "countryname", "isanonymous", "isminor", "isnew", "isrobot", "isunpatrolled", "metrocode", "namespace", "page", "regionisocode", "regionname", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "metricsspec" : [], "granularityspec": { "type": "uniform", "segmentgranularity": "day", "querygranularity": "none", "rollup": false } }, "tuningconfig": { "type": "kafka", "reportparseexceptions": false }, "ioconfig": { "topic": "wikipedia", "inputformat": { "type": "json" }, "replicas": 2, "taskduration": "pt10m", "completiontimeout": "pt20m", "consumerproperties": { "bootstrap.servers": "localhost:9092" } } } }
我们也可以通过直接调用curl来加载kafka数据
curl -xpost -h'content-type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://www.lhsxpumps.com/_localhost:8081/druid/indexer/v1/supervisor
静下心来,努力的提升自己,永远都没有错。更多实时计算相关博文,欢迎关注实时流式计算
如对本文有疑问, 点击进行留言回复!!
HBase Filter 过滤器之FamilyFilter详解
去 HBase,Kylin on Parquet 性能表现如何?
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
网友评论