在进行spark streaming的开发时,我们常常需要将dstream转为dataframe来进行进一步的处理,
共有两种方式,方式一:
val spark = sparksession.builder() .appname("test") .getorcreate() import spark.implicits._ dstream.foreachrdd{ rdd => val df = rdd.map(_.split(" ")) .map(t => (t(1),t(2),t(3))) .todf("col1","col2","col3") // 业务逻辑 }
利用map算子和tuple来完成,一般的场景下采用这种方式即可。
但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:
val spark = sparksession.builder() .appname("test") .getorcreate() dstream.foreachrdd{ rdd => val res:rdd[row] = rdd.map{ row => val buffer = arraybuffer.empty[any] val fields: array[string] = row.split("\\|~\\|") buffer.append(fields(0)) buffer.append(fields(1)) buffer.append(fields(2)) // 省略 buffer.append(fields(25)) row.fromseq(buffer) } val schema = structtype(seq( structfield("col1", stringtype, false), structfield("col2", stringtype, false), structfield("col3", stringtype, false), // 省略 structfield("col26", stringtype, false) )) val df: dataframe = spark.createdataframe(result, schema) // 业务逻辑 }
您可能感兴趣的文章:
如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!
网友评论