企业spark案例 —出租车轨迹分析

企业spark案例 —— 出租车轨迹分析
文章目录
一、数据清洗
学习目标
1.如何使用读取 CSV 文件
2.如何使用正则表达式清洗掉多余字符串 。
将出租车轨迹数据规整化,清洗掉多余的字符串 , 并使用 .show() 打印输出 。
清洗掉红框里面的 $ 、@ 字符 , 由于这两字符出现的次数没有规律 , 所以需要使用正则匹配 。
清洗后内容如下:
import org.apache.spark.sql.SparkSessionobject Step1 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate()/**********begin**********/val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data.csv")frame.createTempView("data")spark.udf.register("cleanData", (x: String) => {x.replaceAll("\\@+", "").replaceAll("\\$+", "")})spark.sql("""|select cleanData(TRIP_ID) as TRIP_ID,cleanData(CALL_TYPE) as CALL_TYPE,cleanData(ORIGIN_CALL) as ORIGIN_CALL,|cleanData(TAXI_ID) as TAXI_ID,cleanData(ORIGIN_STAND) as ORIGIN_STAND ,cleanData(TIMESTAMP) as TIMESTAMP,|cleanData(POLYLINE) as POLYLINE|from data""".stripMargin).show()/**********end**********/spark.stop()}}
二、数据分析
使用完成数据分析
import com.alibaba.fastjson.JSONimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeobject Step2 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate()spark.sparkContext.setLogLevel("error")/**********begin**********/val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data2.csv")frame.createTempView("data")//1.将时间戳转换成时间spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME from data").createTempView("data2")spark.sql("select * from data2").show()//2.将POLYLINE字段 , 分离出startLocation,endLocation 两个字段spark.udf.register("startLocation", (x: String) => {val arr = JSON.parseArray(x)arr.get(0).toString})spark.udf.register("endLocation", (x: String) => {val arr = JSON.parseArray(x)arr.get(arr.size() - 1).toString})spark.sql("""|select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocationfrom data2""".stripMargin).createTempView("data3")spark.sql("select * from data3").show()//3.计算时长 , 行程的总行程时间定义为(点数-1)×15秒 。// 例如 , POLYLINE中具有101个数据点的行程具有(101-1)* 15 = 1500秒的长度spark.udf.register("timeLen", (x: String) => {(JSON.parseArray(x).size() - 1) * 15})spark.sql("""|select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocation,timeLen(POLYLINE) astimeLenfrom data3""".stripMargin).createTempView("data4")spark.sql("select * from data4").show()//4.统计每天各种呼叫类型的数量并以CALL_TYPE,TIME升序排序spark.sql("""|select CALL_TYPE ,TIME,count(1) as num from data4 group by TIME,CALL_TYPE order by CALL_TYPE,TIME""".stripMargin).show()/**********end**********/spark.stop()}}
三、出租车轨迹图表展示
使用 +编写一个展示的图表程序:
对此你需要了解可视化分为前后端 , 也就是我们的MVC设计模式:
M层:
package net.educoder.app.mapper;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Select;import java.util.List;@Mapperpublic interface MainMapper {//参考@Select("SELECT _num from taxi_trend WHERE _taxi = #{type} ORDER BY _time")List findTaxiTrendNumByType(String type);/**********begin**********/@Select("SELECT _time FROM taxi_trend GROUP BY _time ")List