1. spark streaming
2. 当下比较流行的实时计算引擎
吞吐量 编程语言 处理速度 生态
storm 较低 clojure 非常快(亚秒) 阿里(jstorm)
flink 较高 scala 较快(亚秒) 国内使用较少
spark streaming 非常高 scala 快(毫秒) 完善的生态圈
3. spark streaming处理网络数据
//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据 val conf = new sparkconf().setappname("ops1").setmaster("local[2]") val ssc = new streamingcontext(conf, milliseconds(3000)) val receiverds: receiverinputdstream[string] = ssc.sockettextstream("uplooking01", 44444) val pairretds: dstream[(string, int)] = receiverds.flatmap(_.split(",")).map((_, 1)).reducebykey(_ + _) pairretds.print() //开启流计算 ssc.start() //优雅的关闭 ssc.awaittermination()
4. spark streaming接收数据的两种方式(kafka)
receiver
direct
5. spark streaming整合kafka
基于receiver的方式整合kafka(生产环境不建议使用,在0.10中已经移除了)
//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据 val conf = new sparkconf().setappname("ops1").setmaster("local[2]") val ssc = new streamingcontext(conf, milliseconds(3000)) val zkquorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181" val groupid = "myid" val topics = map("hadoop" -> 3) val receiverds: receiverinputdstream[(string, string)] = kafkautils.createstream(ssc, zkquorum, groupid, topics) receiverds.flatmap(_._2.split(" ")).map((_,1)).reducebykey(_+_).print() ssc.start() ssc.awaittermination()
基于direct的方式(生产环境使用)
//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据 val conf = new sparkconf().setappname("ops1").setmaster("local[2]") val ssc = new streamingcontext(conf, milliseconds(3000)) val kafkaparams = map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092") val topics = set("hadoop") val inputds: inputdstream[(string, string)] = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topics) inputds.flatmap(_._2.split(" ")).map((_, 1)).reducebykey(_ + _).print() ssc.start() ssc.awaittermination()
6. 实时流计算的架构
1. 生成日志(模拟用户访问web应用的日志)
public class generateaccesslog { public static void main(string[] args) throws ioexception, interruptedexception { //准备数据 int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120}; string[] requestypes = {"get", "post"}; string[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"}; string[] coursenames = {"大数据", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"}; string[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"}; filewriter fw = new filewriter(args[0]); printwriter printwriter = new printwriter(fw); while (true) { // thread.sleep(1000); //产生字段 string date = new date().tolocalestring(); string method = requestypes[getrandomnum(0, requestypes.length)]; string url = "/cursor" + cursors[getrandomnum(0, cursors.length)]; string httpversion = "http/1.1"; string ip = ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)]; string reference = references[getrandomnum(0, references.length)]; string rowlog = date + " " + method + " " + url + " " + httpversion + " " + ip + " " + reference; printwriter.println(rowlog); printwriter.flush(); } } //[start,end) public static int getrandomnum(int start, int end) { int i = new random().nextint(end - start) + start; return i; } }
2. flume使用avro采集web应用服务器的日志数据
采集命令执行的结果到avro中
# the configuration file needs to define the sources, # the channels and the sinks. # sources, channels and sinks are defined per agent, # in this case called 'agent' f1.sources = r1 f1.channels = c1 f1.sinks = k1 #define sources f1.sources.r1.type = exec f1.sources.r1.command =tail -f /logs/access.log #define channels f1.channels.c1.type = memory f1.channels.c1.capacity = 1000 f1.channels.c1.transactioncapacity = 100 #define sink 采集日志到uplooking03 f1.sinks.k1.type = avro f1.sinks.k1.hostname = uplooking03 f1.sinks.k1.port = 44444 #bind sources and sink to channel f1.sources.r1.channels = c1 f1.sinks.k1.channel = c1 从avro采集到控制台 # the configuration file needs to define the sources, # the channels and the sinks. # sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactioncapacity = 100 #define sink f2.sinks.k2.type = logger #bind sources and sink to channel f2.sources.r2.channels = c2 f2.sinks.k2.channel = c2 从avro采集到kafka中 # the configuration file needs to define the sources, # the channels and the sinks. # sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactioncapacity = 100 #define sink f2.sinks.k2.type = org.apache.flume.sink.kafka.kafkasink f2.sinks.k2.topic = hadoop f2.sinks.k2.brokerlist = uplooking03:9092,uplooking04:9092,uplooking05:9092 f2.sinks.k2.requiredacks = 1
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。
如对本文有疑问, 点击进行留言回复!!
[杭电多校2020]第一场 1004 Distinct Sub-palindromes
Swift -- 将本地生成的UIImage进行持久化保存(存到文件中fileManager.createFile)
网友评论