当前位置: 移动技术网 > IT编程>数据库>其他数据库 > DStream转为DF的两种方式(突破map时元组22的限制)

DStream转为DF的两种方式(突破map时元组22的限制)

2019年07月12日  | 移动技术网IT编程  | 我要评论
在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理, 共有两种方式,方式一: 利用map算子和tuple来完成,一般的场景下采用这种方式即可。 但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22 ...

在进行spark streaming的开发时,我们常常需要将dstream转为dataframe来进行进一步的处理,
共有两种方式,方式一:

val spark = sparksession.builder()
  .appname("test")
  .getorcreate()
import spark.implicits._
dstream.foreachrdd{ rdd =>
  val df = rdd.map(_.split(" "))
    .map(t => (t(1),t(2),t(3)))
    .todf("col1","col2","col3")
  // 业务逻辑
}

利用map算子和tuple来完成,一般的场景下采用这种方式即可。

但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:

val spark = sparksession.builder()
  .appname("test")
  .getorcreate()
dstream.foreachrdd{ rdd =>
  val res:rdd[row] = rdd.map{ row =>
    val buffer = arraybuffer.empty[any]
    val fields: array[string] = row.split("\\|~\\|")
    buffer.append(fields(0))
    buffer.append(fields(1))
    buffer.append(fields(2))
    // 省略
    buffer.append(fields(25))
    row.fromseq(buffer)
  } 
  val schema = structtype(seq(
    structfield("col1", stringtype, false),
    structfield("col2", stringtype, false),
    structfield("col3", stringtype, false),
    // 省略
    structfield("col26", stringtype, false)
  ))
  val df: dataframe = spark.createdataframe(result, schema)
  // 业务逻辑
}

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网