spark 对复杂json处理,且json字段长短不一

背景:一条json消息中根据某个字段不同,会有不同的数据结构,json有多重嵌套

主要使用了

  • explode: 展开array或map为多行
  • inline:展开可以把array[struct[XXX]]直接展开成XXX
scala> val res0 = spark.read.json("file:///home/hdfs/20200422.csv")
res0: org.apache.spark.sql.DataFrame = [collectTime: string, factory: string ... 2 more fields]

scala> res0.printSchema
root
 |-- collectTime: string (nullable = true)
 |-- factory: string (nullable = true)
 |-- infoList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- info: struct (nullable = true)
 |    |    |    |-- EnergyStorageDevice: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- batterynum: long (nullable = true)
 |    |    |    |    |    |-- dianliu: long (nullable = true)
 |    |    |    |    |    |-- no: long (nullable = true)
 |    |    |    |    |    |-- one_voltages: array (nullable = true)
 |    |    |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |    |    |-- this_batterynum: long (nullable = true)
 |    |    |    |    |    |-- this_no: long (nullable = true)
 |    |    |    |    |    |-- voltage: long (nullable = true)
 |    |    |    |-- EnergyStorageDeviceTemperature: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- no: long (nullable = true)
 |    |    |    |    |    |-- probe_num: long (nullable = true)
 |    |    |    |    |    |-- probe_values: array (nullable = true)
 |    |    |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- alarmSign: string (nullable = true)
 |    |    |    |-- brakeStatus: long (nullable = true)
 |    |    |    |-- chargeStatus: long (nullable = true)
 |    |    |    |-- dcdc: long (nullable = true)
 |    |    |    |-- driveMotor: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- controllerT: long (nullable = true)
 |    |    |    |    |    |-- current: long (nullable = true)
 |    |    |    |    |    |-- inputVoltage: long (nullable = true)
 |    |    |    |    |    |-- no: long (nullable = true)
 |    |    |    |    |    |-- rev: long (nullable = true)
 |    |    |    |    |    |-- status: long (nullable = true)
 |    |    |    |    |    |-- temperature: long (nullable = true)
 |    |    |    |    |    |-- torque: long (nullable = true)
 |    |    |    |-- engineFaultCount: long (nullable = true)
 |    |    |    |-- engineFaultList: string (nullable = true)
 |    |    |    |-- faultList: string (nullable = true)
 |    |    |    |-- gears: long (nullable = true)
 |    |    |    |-- latitude: string (nullable = true)
 |    |    |    |-- location: long (nullable = true)
 |    |    |    |-- longitude: string (nullable = true)
 |    |    |    |-- maxAlarmLevel: long (nullable = true)
 |    |    |    |-- maxCodeV: long (nullable = true)
 |    |    |    |-- maxProbeNoT: long (nullable = true)
 |    |    |    |-- maxSysNoT: long (nullable = true)
 |    |    |    |-- maxSysNoV: long (nullable = true)
 |    |    |    |-- maxT: long (nullable = true)
 |    |    |    |-- maxValueV: double (nullable = true)
 |    |    |    |-- minCodeV: long (nullable = true)
 |    |    |    |-- minProbeNoT: long (nullable = true)
 |    |    |    |-- minSysNoT: long (nullable = true)
 |    |    |    |-- minSysNoV: long (nullable = true)
 |    |    |    |-- minT: long (nullable = true)
 |    |    |    |-- minValueV: double (nullable = true)
 |    |    |    |-- motorFaultCount: long (nullable = true)
 |    |    |    |-- motorFaultList: string (nullable = true)
 |    |    |    |-- number: long (nullable = true)
 |    |    |    |-- odo: double (nullable = true)
 |    |    |    |-- otherFaultCount: long (nullable = true)
 |    |    |    |-- otherFaultList: string (nullable = true)
 |    |    |    |-- runMode: long (nullable = true)
 |    |    |    |-- sir: long (nullable = true)
 |    |    |    |-- soc: double (nullable = true)
 |    |    |    |-- speed: double (nullable = true)
 |    |    |    |-- storedFaultCount: long (nullable = true)
 |    |    |    |-- totalCurrent: long (nullable = true)
 |    |    |    |-- totalVoltage: long (nullable = true)
 |    |    |    |-- travel: double (nullable = true)
 |    |    |    |-- vehicleStatus: long (nullable = true)
 |    |    |-- infoType: long (nullable = true)
 |-- vin: string (nullable = true)

scala> res0.show()
+--------------+-----------+--------------------+-----------------+
|   collectTime|    factory|            infoList|              vin|
+--------------+-----------+--------------------+-----------------+
|20200422131027|     futian|[[[,,, 0, 3, 2,,,...|LVBV3J0B6JE900137|
|20200422131026|yiqijiefang|[[[,,, 0, 3, 1,,,...|LFNA4LDA1JAX07996|
+--------------+-----------+--------------------+-----------------+


scala> val res1 = res0.select(res0("collectTime"), res0("factory"), expr("inline(infoList)"), res0("vin"))
res1: org.apache.spark.sql.DataFrame = [collectTime: string, factory: string ... 3 more fields]

scala> res1.printSchema
root
 |-- collectTime: string (nullable = true)
 |-- factory: string (nullable = true)
 |-- info: struct (nullable = true)
 |    |-- EnergyStorageDevice: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- batterynum: long (nullable = true)
 |    |    |    |-- dianliu: long (nullable = true)
 |    |    |    |-- no: long (nullable = true)
 |    |    |    |-- one_voltages: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- this_batterynum: long (nullable = true)
 |    |    |    |-- this_no: long (nullable = true)
 |    |    |    |-- voltage: long (nullable = true)
 |    |-- EnergyStorageDeviceTemperature: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- no: long (nullable = true)
 |    |    |    |-- probe_num: long (nullable = true)
 |    |    |    |-- probe_values: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |-- alarmSign: string (nullable = true)
 |    |-- brakeStatus: long (nullable = true)
 |    |-- chargeStatus: long (nullable = true)
 |    |-- dcdc: long (nullable = true)
 |    |-- driveMotor: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- controllerT: long (nullable = true)
 |    |    |    |-- current: long (nullable = true)
 |    |    |    |-- inputVoltage: long (nullable = true)
 |    |    |    |-- no: long (nullable = true)
 |    |    |    |-- rev: long (nullable = true)
 |    |    |    |-- status: long (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |    |    |    |-- torque: long (nullable = true)
 |    |-- engineFaultCount: long (nullable = true)
 |    |-- engineFaultList: string (nullable = true)
 |    |-- faultList: string (nullable = true)
 |    |-- gears: long (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- location: long (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- maxAlarmLevel: long (nullable = true)
 |    |-- maxCodeV: long (nullable = true)
 |    |-- maxProbeNoT: long (nullable = true)
 |    |-- maxSysNoT: long (nullable = true)
 |    |-- maxSysNoV: long (nullable = true)
 |    |-- maxT: long (nullable = true)
 |    |-- maxValueV: double (nullable = true)
 |    |-- minCodeV: long (nullable = true)
 |    |-- minProbeNoT: long (nullable = true)
 |    |-- minSysNoT: long (nullable = true)
 |    |-- minSysNoV: long (nullable = true)
 |    |-- minT: long (nullable = true)
 |    |-- minValueV: double (nullable = true)
 |    |-- motorFaultCount: long (nullable = true)
 |    |-- motorFaultList: string (nullable = true)
 |    |-- number: long (nullable = true)
 |    |-- odo: double (nullable = true)
 |    |-- otherFaultCount: long (nullable = true)
 |    |-- otherFaultList: string (nullable = true)
 |    |-- runMode: long (nullable = true)
 |    |-- sir: long (nullable = true)
 |    |-- soc: double (nullable = true)
 |    |-- speed: double (nullable = true)
 |    |-- storedFaultCount: long (nullable = true)
 |    |-- totalCurrent: long (nullable = true)
 |    |-- totalVoltage: long (nullable = true)
 |    |-- travel: double (nullable = true)
 |    |-- vehicleStatus: long (nullable = true)
 |-- infoType: long (nullable = true)
 |-- vin: string (nullable = true)

scala> res1.show()
+--------------+-----------+--------------------+--------+-----------------+
|   collectTime|    factory|                info|infoType|              vin|
+--------------+-----------+--------------------+--------+-----------------+
|20200422131027|     futian|[,,, 0, 3, 2,,,,,...|       1|LVBV3J0B6JE900137|
|20200422131027|     futian|[,,,,,, [[18, 0, ...|       2|LVBV3J0B6JE900137|
|20200422131027|     futian|[[[152, 10000, 1,...|       8|LVBV3J0B6JE900137|
|20200422131027|     futian|[, [[1, 38, [60, ...|       9|LVBV3J0B6JE900137|
|20200422131027|     futian|[,,,,,,,,,,,,,,, ...|       6|LVBV3J0B6JE900137|
|20200422131027|     futian|[,,,,,,,,,,, 39.7...|       5|LVBV3J0B6JE900137|
|20200422131027|     futian|[,, 0000000000000...|       7|LVBV3J0B6JE900137|
|20200422131026|yiqijiefang|[,,, 0, 3, 1,,,,,...|       1|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[,,,,,, [[26, 30,...|       2|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[,,,,,,,,,,, 39.8...|       5|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[,,,,,,,,,,,,,,, ...|       6|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[,, 0000000000000...|       7|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[[[160, 10320, 1,...|       8|LFNA4LDA1JAX07996|
|20200422131026|yiqijiefang|[, [[1, 32, [58, ...|       9|LFNA4LDA1JAX07996|
+--------------+-----------+--------------------+--------+-----------------+


scala> res1.filter("infoType=8").select(res1("collectTime"), res1("factory"), expr("inline(info.EnergyStorageDevice)"), res1("infoType"),res1("vin")).show()
+--------------+-----------+----------+-------+---+--------------------+---------------+-------+-------+--------+-----------------+
|   collectTime|    factory|batterynum|dianliu| no|        one_voltages|this_batterynum|this_no|voltage|infoType|              vin|
+--------------+-----------+----------+-------+---+--------------------+---------------+-------+-------+--------+-----------------+
|20200422131027|     futian|       152|  10000|  1|[38927, 56335, 56...|            152|      1|   6180|       8|LVBV3J0B6JE900137|
|20200422131026|yiqijiefang|       160|  10320|  1|[40972, 41740, 50...|            160|      1|   5200|       8|LFNA4LDA1JAX07996|
+--------------+-----------+----------+-------+---+--------------------+---------------+-------+-------+--------+-----------------+


scala> res1.filter("infoType=5").select(res1("collectTime"), res1("factory"), res1("info.latitude"), res1("infoType"),res1("vin")).show()
+--------------+-----------+---------+--------+-----------------+
|   collectTime|    factory| latitude|infoType|              vin|
+--------------+-----------+---------+--------+-----------------+
|20200422131027|     futian|39.718566|       5|LVBV3J0B6JE900137|
|20200422131026|yiqijiefang|39.875452|       5|LFNA4LDA1JAX07996|
+--------------+-----------+---------+--------+-----------------+

results matching ""

    No results matching ""