电子说
步骤1:俄勒冈科学气象站BLE协议
探索气象站公开的BLE服务的一种简单方法是在智能手机或平板电脑上使用BLE扫描仪应用程序。在这里,我将使用Android上的Nordic Semiconductor的nRF Connect。
对于不熟悉BLE的人们,您可能想开始使用此低功耗蓝牙入门指南。这将帮助您了解GATT,服务和特性之类的术语。
借助BLE扫描器应用程序,您将找到周围的所有BLE设备。并且您会看到俄勒冈科学气象站正在将其自己广告为 IDTW211R 。
如果连接到气象站,您将获得设备公开的服务列表
通用访问UUID:0x1800
通用属性UUID:0x1801
专有服务UUID:74e7fe00-c6a4-11e2-b7a9-0002a5d5c51b
设备信息UUID:0x180A
电池服务UUID:0x180F
对我们而言重要的服务是专有服务(UUID:74e7fe00-c6a4- 11E2-b7a9-0002a5d5c51b)。因此,我们单击该服务并获得该服务公开的特征列表。
我们的应用程序的相关特征是UUID:74e78e10-c6a4-11e2-b7a9-0002a5d5c51b(INDOOR_AND_CH1_TO_3_TH_DATA)。
如属性中所示,此特征不可读,但提供指示服务。而且我们将必须通过相应的客户端特征配置描述符(UUID:0x2902)启用指示。
事实证明,必须为设备的所有特征启用指示/通知,才能开始获取任何指示。
步骤2:将Raspberry Pi3连接到BLE气象站
现在,我们对如何通过BLE检索温度数据有了更好的了解,让我们尝试让Raspberry Pi与气象站进行通讯。
Raspberry Pi上的BLE
我将使用Raspberry Pi3(集成了WLAN/BT控制器)和具有像素2016-09-23的Raspbian Jessie。
bluez (5.23)的版本此Raspbian版本已经过时了,我们必须升级到 bluez 的更高版本才能使用Bluetooth Low Energy。
下载 bluez 5.43 》并安装所需的依赖项:
sudo apt-ge t update
sudo apt-get install -y libglib2.0-dev libdbus-1-dev libudev-dev libical-dev libreadline-dev
wget http://www.kernel .org/pub/linux/bluetooth/bluez-5.43.tar.xz
tar xvf bluez-5.43.tar.xz
配置,构建和安装 bluez
。/configure
make
sudo make install
最后重新启动Raspberry Pi。
现在,让我们使用cmd hciconfig dev
验证蓝牙堆栈是否已启动并且正在运行,然后确认我们可以扫描BLE设备: sudo hcitool lescan
我们应该看到气象站的广告为 IDTW211R 。
下一步是连接到气象站:
sudo gatttool -b -t random -I
您应该获得一个新的命令提示符,其BLE地址位于方括号之间。
connect
您应该获得“连接成功”的提示,并变为彩色
连接后,我们可以运行一些命令以获取有关设备的更多详细信息。例如,主要命令将列出设备公开的服务。
从气象站收集数据
如前所述,开始获取有关在INDOOR_AND_CH1_TO_3_TH_DATA(UUID:74e78e10-c6a4-11e2-b7a9-0002a5d5c51b)中,我们必须启用所有特征的指示/通知。
我们通过在客户端特征配置描述符中写入0x0002(用于通知的0x0001)来启用指示。 (UUID:0x2902)每个特性。写应该以小尾数形式放置,这样: char-write-req 0200
作为回报,您应该开始获得指示/通知。对我们而言,相关的是INDOOR_AND_CH1_TO_3_TH_DATA指示(因此句柄0x0017)。
指示句柄= 0x0017值:01 05 01 15 01 ff 7f ff 7f 7f 7f 7f 7f ff ff 7f 7f 7f 7f 7f 7f
指示句柄= 0x0017值:82 7f 7f 7f 21 01 f8 00 24 01 ba 00 ff 7f ff 7f ff 7f ff 7f
对于每一轮指示,我们得到两个20的数据包每个字节。最高有效字节指示数据类型(类型0或类型1)。有关数据包的更多详细信息,请参见上一张图片。
步骤3:检索气象站数据的Python脚本
现在我们成功地从气象站中获取了数据,让我们自动化了整个过程并理解了数据。
这是一个连接到气象站,检索数据并提取气象站底座和随附的无线传感器的温度。
此脚本利用bluepy库提供了API,以允许从Python访问Bluetooth Low Energy设备。因此,您必须在执行脚本之前安装此模块:https://github.com/IanHarvey/bluepy#installation
用法
可以使用MAC地址执行脚本
在后一种情况下,脚本将执行扫描并查找广告为 IDTW211R 的设备。它必须以root特权执行,因为 bluez 需要 root 特权才能进行扫描操作。
python bleWeatherStation.py [mac-address]
sudo python bleWeatherStation.py
bleWeatherStation.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Connect to Oregon Scientific BLE Weather Station
# Copyright (c) 2016 Arnaud Balmelle
#
# This script will connect to Oregon Scientific BLE Weather Station
# and retrieve the temperature of the base and sensors attached to it.
# If no mac-address is passed as argument, it will scan for an Oregon Scientific BLE Weather Station.
#
# Supported Oregon Scientific Weather Station: EMR211 and RAR218HG (and probably BAR218HG)
#
# Usage: python bleWeatherStation.py [mac-address]
#
# Dependencies:
# - Bluetooth 4.1 and bluez installed
# - bluepy library (https://github.com/IanHarvey/bluepy)
#
# License: Released under an MIT license: http://opensource.org/licenses/MIT
import sys
import logging
import time
import sqlite3
from bluepy.btle import *
# uncomment the following line to get debug information
logging.basicConfig(format=‘%(asctime)s: %(message)s’, level=logging.DEBUG)
WEATHERSTATION_NAME = “IDTW211R” # IDTW213R for RAR218HG
class WeatherStation:
def __init__(self, mac):
self._data = {}
try:
self.p = Peripheral(mac, ADDR_TYPE_RANDOM)
self.p.setDelegate(NotificationDelegate())
logging.debug(‘WeatherStation connected !’)
except BTLEException:
self.p = 0
logging.debug(‘Connection to WeatherStation failed !’)
raise
def _enableNotification(self):
try:
# Enable all notification or indication
self.p.writeCharacteristic(0x000c, “x02x00”)
self.p.writeCharacteristic(0x000f, “x02x00”)
self.p.writeCharacteristic(0x0012, “x02x00”)
self.p.writeCharacteristic(0x0015, “x01x00”)
self.p.writeCharacteristic(0x0018, “x02x00”)
self.p.writeCharacteristic(0x001b, “x02x00”)
self.p.writeCharacteristic(0x001e, “x02x00”)
self.p.writeCharacteristic(0x0021, “x02x00”)
self.p.writeCharacteristic(0x0032, “x01x00”)
logging.debug(‘Notifications enabled’)
except BTLEException as err:
print(err)
self.p.disconnect()
def monitorWeatherStation(self):
try:
# Enable notification
self._enableNotification()
# Wait for notifications
while self.p.waitForNotifications(1.0):
# handleNotification() was called
continue
logging.debug(‘Notification timeout’)
except:
return None
regs = self.p.delegate.getData()
if regs is not None:
# expand INDOOR_AND_CH1_TO_3_TH_DATA_TYPE0
self._data[‘index0_temperature’] = ‘’.join(regs[‘data_type0’][4:6] + regs[‘data_type0’][2:4])
self._data[‘index1_temperature’] = ‘’.join(regs[‘data_type0’][8:10] + regs[‘data_type0’][6:8])
self._data[‘index2_temperature’] = ‘’.join(regs[‘data_type0’][12:14] + regs[‘data_type0’][10:12])
self._data[‘index3_temperature’] = ‘’.join(regs[‘data_type0’][16:18] + regs[‘data_type0’][14:16])
self._data[‘index0_humidity’] = regs[‘data_type0’][18:20]
self._data[‘index1_humidity’] = regs[‘data_type0’][20:22]
self._data[‘index2_humidity’] = regs[‘data_type0’][22:24]
self._data[‘index3_humidity’] = regs[‘data_type0’][24:26]
self._data[‘temperature_trend’] = regs[‘data_type0’][26:28]
self._data[‘humidity_trend’] = regs[‘data_type0’][28:30]
self._data[‘index0_humidity_max’] = regs[‘data_type0’][30:32]
self._data[‘index0_humidity_min’] = regs[‘data_type0’][32:34]
self._data[‘index1_humidity_max’] = regs[‘data_type0’][34:36]
self._data[‘index1_humidity_min’] = regs[‘data_type0’][36:38]
self._data[‘index2_humidity_max’] = regs[‘data_type0’][38:40]
# expand INDOOR_AND_CH1_TO_3_TH_DATA_TYPE1
self._data[‘index2_humidity_min’] = regs[‘data_type1’][2:4]
self._data[‘index3_humidity_max’] = regs[‘data_type1’][4:6]
self._data[‘index3_humidity_min’] = regs[‘data_type1’][6:8]
self._data[‘index0_temperature_max’] = ‘’.join(regs[‘data_type1’][10:12] + regs[‘data_type1’][8:10])
self._data[‘index0_temperature_min’] = ‘’.join(regs[‘data_type1’][14:16] + regs[‘data_type1’][12:14])
self._data[‘index1_temperature_max’] = ‘’.join(regs[‘data_type1’][18:20] + regs[‘data_type1’][16:18])
self._data[‘index1_temperature_min’] = ‘’.join(regs[‘data_type1’][22:24] + regs[‘data_type1’][20:22])
self._data[‘index2_temperature_max’] = ‘’.join(regs[‘data_type1’][26:28] + regs[‘data_type1’][24:26])
self._data[‘index2_temperature_min’] = ‘’.join(regs[‘data_type1’][30:32] + regs[‘data_type1’][28:30])
self._data[‘index3_temperature_max’] = ‘’.join(regs[‘data_type1’][34:36] + regs[‘data_type1’][32:34])
self._data[‘index3_temperature_min’] = ‘’.join(regs[‘data_type1’][38:40] + regs[‘data_type1’][36:38])
return True
else:
return None
def getValue(self, indexstr):
val = int(self._data[indexstr], 16)
if val 》= 0x8000:
val = ((val + 0x8000) & 0xFFFF) - 0x8000
return val
def getIndoorTemp(self):
if ‘index0_temperature’ in self._data:
temp = self.getValue(‘index0_temperature’) / 10.0
max = self.getValue(‘index0_temperature_max’) / 10.0
min = self.getValue(‘index0_temperature_min’) / 10.0
logging.debug(‘Indoor temp : %.1f°C, max : %.1f°C, min : %.1f°C’, temp, max, min)
return temp
else:
return None
def getOutdoorTemp(self):
if ‘index1_temperature’ in self._data:
temp = self.getValue(‘index1_temperature’) / 10.0
max = self.getValue(‘index1_temperature_max’) / 10.0
min = self.getValue(‘index1_temperature_min’) / 10.0
logging.debug(‘Outdoor temp : %.1f°C, max : %.1f°C, min : %.1f°C’, temp, max, min)
return temp
else:
return None
def disconnect(self):
self.p.disconnect()
class NotificationDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
self._indoorAndOutdoorTemp_type0 = None
self._indoorAndOutdoorTemp_type1 = None
def handleNotification(self, cHandle, data):
formatedData = binascii.b2a_hex(data)
if cHandle == 0x0017:
# indoorAndOutdoorTemp indication received
if formatedData[0] == ‘8’:
# Type1 data packet received
self._indoorAndOutdoorTemp_type1 = formatedData
logging.debug(‘indoorAndOutdoorTemp_type1 = %s’, formatedData)
else:
# Type0 data packet received
self._indoorAndOutdoorTemp_type0 = formatedData
logging.debug(‘indoorAndOutdoorTemp_type0 = %s’, formatedData)
else:
# skip other indications/notifications
logging.debug(‘handle %x = %s’, cHandle, formatedData)
def getData(self):
if self._indoorAndOutdoorTemp_type0 is not None:
# return sensors data
return {‘data_type0’:self._indoorAndOutdoorTemp_type0, ‘data_type1’:self._indoorAndOutdoorTemp_type1}
else:
return None
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
global weatherStationMacAddr
if dev.getValueText(9) == WEATHERSTATION_NAME:
# Weather Station in range, saving Mac address for future connection
logging.debug(‘WeatherStation found’)
weatherStationMacAddr = dev.addr
if __name__==“__main__”:
weatherStationMacAddr = None
if len(sys.argv) 《 2:
# No MAC address passed as argument
try:
# Scanning to see if Weather Station in range
scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(2.0)
except BTLEException as err:
print(err)
print(‘Scanning required root privilege, so do not forget to run the script with sudo.’)
else:
# Weather Station MAC address passed as argument, will attempt to connect with this address
weatherStationMacAddr = sys.argv[1]
if weatherStationMacAddr is None:
logging.debug(‘No WeatherStation in range !’)
else:
try:
# Attempting to connect to device with MAC address “weatherStationMacAddr”
weatherStation = WeatherStation(weatherStationMacAddr)
if weatherStation.monitorWeatherStation() is not None:
# WeatherStation data received
indoor = weatherStation.getIndoorTemp()
outdoor = weatherStation.getOutdoorTemp()
else:
logging.debug(‘No data received from WeatherStation’)
weatherStation.disconnect()
except KeyboardInterrupt:
logging.debug(‘Program stopped by user’)
全部0条评论
快来发表一下你的评论吧 !