授权协议 Readme
开发语言 Java
操作系统 跨平台
软件类型 开源软件
所属分类 云计算、 云原生
软件简介
车流量监控
前言
目的 1.对业务场景抽象,熟练Spark编码 2.增加自定义累加器,自定义UDF 3.Spark 优化方式
项目 数据处理架构
模块介绍
卡扣流量分析 Spark Core
卡扣车流量转化率 Spark Core
各区域车流量最高top5的道路统计 SparkSQL
稽查布控,道路实时拥堵统计 SparkStreaming
hive表
monitor_flow_action表 – date 日期 天 – monitor_id 卡口号 – camera_id 摄像头编号 – car 车牌 – action_time 某个摄像头拍摄时间 s – speed 通过卡扣的速度 – road_id 道路id – area_id 区域ID
monitor_camera_info表 某一个卡扣对应的摄像头编号 – monitor_id:卡扣编号 – camera_id:摄像头编号
数据加载hive中 1). 创建表,加载数据load data Data2File
hive -f createHiveTab.sql
2). 集群中提交spark作业,使用代码生成到hive Data2Hive
大数据开发流程
数据调研(对底层的数据的表结构进行调研,分析和研究)
需求分析(与PM讨论需求,画原型图 axure)
基于讨论出来的结果做出技术方案(某个难点用什么技术,数据库选型)
具体实施
功能点
根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)
检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计 • 卡口正常数 • 异常数 • camera的正常数 • camera的异常数 • camera的详细信息( monitor_id:camera_id)
车流量最多的TonN卡扣号 • 获取每一个卡扣的详细信息( Top5 )
随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个 区域的车辆)
计算出经常高速通过的TopN卡口 (查看哪些卡扣经常被高速通过,高速,中速,正常,低速 根据三个速度段进行四 次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)
跟车分析
需求分析
按条件筛选卡扣信息
• 可以指定 不同的条件,时间范围、区域范围、卡扣号等 可以灵活的分析不同区域的卡扣信息
监测卡扣状态
• 对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
车流量最多的TonN卡扣
• 查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进 城的车辆非常多,啥原因? 要造反? 这个功能点里面也会拿到具体的车辆的信息,分析一下是京牌车造成的还是外地 车牌?
在符合条件的卡扣信息中随机抽取N个车辆信息
• 随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆 的流动方向。以便于道路的规划。
计算出经常高速通过的TopN卡口
• 统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备
跟车分析
• 计算出所有车是否被跟踪过,然后将结果存储在MySQL中,以便后期进行查询
项目分析
monitor_flow_action 监控数据表
monitor_camera_info 卡扣与摄像头基本关系表
1.卡扣监控
统计: 正常的卡扣个数,异常的卡扣个数,正常的摄像头个数,异常的摄像头个数,异常的摄像头详细信息
正常卡扣个数:
monitor_camera_info 基本关系表中卡扣与摄像头的关系与在monitor_flow_action 监控数据表 中,卡扣与摄像头的关系完全对应上 0001:11111,22222 0001 11111 xxx 0001 22222 xxx RDD思路-正常的卡扣数为例: monitor_flow_action表 -> RDD
异常的卡扣个数:
1.monitor_camera_info 基本关系表中 卡扣 与摄像头的关系,在监控的数据表中 一条都没有对应。 2.monitor_camera_info 基本关系表中 卡扣 与摄像头的关系,在监控的数据表中 部分数据有对应。
正常的摄像头个数:
异常的摄像头个数:
异常的摄像头详细信息:0001:11111,22222,33333
~0004:76789,27449,87911,61106,45624,37726,09506 ~0001:70037,23828,34361,92206,76657,26608 ~0003:36687,99260,49613,97165 ~0006:82302,11645,73565,36440 ~0002:60478,07738,53139,75127,16494,48312 ~0008:34144,27504,83395,62222,49656,18640 ~0007:19179,72906,55656,60720,74161,85939,51743,40565,13972,79216,35128,27369,84616,09553 ~0000:67157,85327,08658,57407,64297,15568,31898,36621 ~0005:09761,12853,91031,33015,52841,15425,45548,36528
注意:
求个数: 累加器实现(并行 分布式)
异常的摄像头信息,用累加器实现,无非拼的是字符串
更新累加器与take使用时,take算子可以触发多个job执行,可以造成累加器重复计算。
./spark-submit --master spark://node1:7077,node2:7077 --jars ../lib/fastjson-1.2.11.jar,../lib/mysql-connector-java-5.1.6.jar --class MonitorFlowAnalyze ../lib/Test.jar 1 ~0001:13846,54785,51995,64341,45994,32228,82054,87746 ~0003:38780,08844,03281,07183,50318,87000,16722,11604,26508,45523,46380 ~0007:61833,19140,38387 ~0005:63920,23464,37389,01219,96765,24844,32101,24141~ ~0004:60778,35444,35403,68811,73819,81893 ~0006:09621,67028,96375,60036,91237,53743,10305 ~0002:24694,01172,25945,79625,83215,72235,26855 ~0008:24630,40432,96808,78708,28294 ~0000:68070,12865,49505,26035,36931,38053,91868
2.通过车辆数最多的topN卡扣
3.统计topN卡扣下经过的所有车辆详细信息
4.车辆通过速度相对比较快的topN卡扣
车速: 120=
5.卡扣“0001”下所有车辆轨迹
1.过滤日期范围内 卡扣“0001”下 有哪些车辆? 2.过滤日期范围内 这些车辆经过卡扣的时间,按照时间升序排序
6.车辆碰撞
01,02中这一天同时出现的车辆 01:(car,row) 02:(car,row) (car,row).join(car,row) 01:car 02:car car02.intersection(car02)
7.随机抽取车辆
在一天中要随机抽取100辆车,抽取的车辆可以权威代表当天交通运行情况。 假如这天一共有10000辆车,要随机抽取100辆车: sample(true,0.1,seed) 00~01 100 100/10000*100 = 1 01~02 100 1 02~03 100 1 04~05 200 2 05~06 200 2 06~07 300 3 08~09 500 5 09~10 200 2 10~11 200 2 11~12 300 3 12~13 500 5 13~14 700 7 。。 。。 。。
8.卡扣流量转换率
卡扣流量转换率,是指车辆运行连续卡口的转换率 如:一车辆经过卡口的轨迹为:0001,0002,0004,0001,0002,0003,0001,0002。 0001,0002:卡扣0001到卡扣0002的转换率为:经过卡扣0001,又经过卡扣0002的次数/经过卡扣0001的次数,针对上面的例子, 经过卡口0001的次数为:3次,经过卡口0001,又经过卡口0002的次数为:3次,那么卡扣0001到卡扣0002的转换率为:3/3=100%,同理: 0001,0002,0003 代表卡扣0001,0002到卡扣0003的转换率。 0001,0002,0003,0004代表卡扣0001,0002,0003到卡扣00004的转换率。 MonitorOneStepConvertRateAnalyze.java
一辆车的轨迹: 0001->0002->0003->0001->0002->0004->0005->0001 0001,0002----卡扣0001到卡扣0002 的车流量转化率:通过卡扣0001又通过卡扣0002的次数/通过卡扣0001的次数 2/3 0001,0002,0003 ---- 卡扣0001,0002到0003的车辆转换率:通过卡扣0001,0002,0003的次数 /通过卡扣0001,0002 0001,0002,0003,0004 -----卡扣0001,0002,0003到0004的车辆转换率:通过卡扣0001,0002,0003,0004的次数 /通过卡扣0001,0002,0003 0001,0002,0003,0004,0005 -----卡扣0001,0002,0003,0004到0005的车辆转换率:通过卡扣0001,0002,0003,0004,0005的次数 /通过卡扣0001,0002,0003,0004的次数 手动输入卡扣号: 0001,0002,0003,0004,0005 求: 0001,0002 0001,0002,0003 0001,0002,0003,0004 0001,0002,0003,0004,0005
粤A11111: ("0001",100) ("0001,0002",30) ("0001,0002,0003",10) 粤B22222: ("0001",200) ("0001,0002",100) ("0001,0002,0003",70) ("0001,0002,0003,0004",10)
9.实时道路拥堵情况
计算一段时间内卡扣下通过的车辆的平均速度。 这段时间不能太短,也不能太长。就计算当前时间的前五分钟 当前卡扣下通过所有车辆的平均速度。 每隔5s 计算一次当前卡扣过去5分钟 所有车辆的平均速度。 SparkStreaming 窗口函数 window lenth:5min slide interval:5s
10.动态改变广播变量
`transform` `foreachRDD`
11.统计每个区域中车辆最多的前3道路
道路车辆:道路中的每个卡扣经过的车辆累加 天河区 元岗路1 0001=30,0002=50,0003=100,0004=20 200 天河区 元岗路2 0005=50,0006=100 150 天河区 元岗路3 100 越秀区 xxx1 200 越秀区 xxx2 150 越秀区 xxx3 100 SparkSQL Hive 表 --t1 : monitor_id car road_id area_id ----- areaId area_name road_id monitor_id car ------ tmp_car_flow_basic sql: select area_name,road_id,count(car) as car_count,UDAF(monitor_id) as monitor_infos from t1 group by area_name,road_id ---- tmp_area_road_flow_count 开窗函数:row_number() over (partition by xxx order by xxx ) rank select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_name order by car_count desc ) rank from tmp_area_road_flow_count ---- tmp select area_name,road_id,car_count,monitor_infos from tmp where rank <=3 ----------------------------------------------------------------------- 总sql: select area_name,road_id,car_count,monitor_infos from ( select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_id order by carCount desc ) rank from ( select area_name,road_id,count(car) as car_count ,UDAF(monitor_id) as monitor_infos from t1 group by area_name,road_id ) t2 ) t3 where rank <=3
=================================================================================================================== sql: select prefix_area_name_road_id,count(car) as car_count,UDAF(monitor_id) as monitor_infos from t1 group by prefix_area_name_road_id ---- tmp_area_road_flow_count
select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_name order by car_count desc ) rank from tmp_area_road_flow_count ---- tmp select area_name,road_id,car_count,monitor_infos from tmp where rank <=3 ----------------------------------------------------------------------- 总sql: select area_name,road_id,car_count,monitor_infos from ( select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_id order by carCount desc ) rank from ( select area_name,road_id,count(car) as car_count ,UDAF(monitor_id) as monitor_infos from t1 group by area_name,road_id ) t2 ) t3 where rank <=3
车辆轨迹
统计卡扣0001下所有车辆的轨迹 -- take(20)
各区域车流量最高topN的道路统计 1.会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到executor节点,不走shuffle过程,更加高效。 sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "20971520"); //单位:字节 2.在Hive中执行sql文件: hive –f sql.sql 3.提交命令: --master spark://node1:7077,node2:7077 --jars ../lib/mysql-connector-java-5.1.6.jar,../lib/fastjson-1.2.11.jar --driver-class-path ../lib/mysql-connector-java-5.1.6.jar:../lib/fastjson-1.2.11.jar ../lib/Test.jar 4
3.缉查布控,道路实时拥堵统计 动态改变广播变量的值:可以通过transform和foreachRDD
屏蔽过多黄色警告,忽略java类方法的参数 与注释; File -> Settings -> Editor -> Inspections -> java ->javadoc: 参数不一致的屏蔽: Declaration has problems in Javadoc refere 红色 改成 waring黄色 参数没有注释: Dangling Javadoc comment 去掉勾选 Declaration has Javadoc problems 去掉勾选
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
全部0条评论
快来发表一下你的评论吧 !