Home Development for Android Reactive Bluetooth operation in the real world

Reactive Bluetooth operation in the real world

by admin

A little preface, or what’s the pain

Lately, I’ve been actively working on applications that have Bluetooth modules on not-very-good-designed protocols with custom devices, which periodically adds to my interesting uh-huh, what’s up problems.

Since I am a sincere fan of reactivity in applications, such problems had to be solved by my own efforts, since there are simply no solutions on the web. None at all. So, this is what I want to tell you about.

Dangers on the path of the Jedi

The first important thing for a developer to keep in mind when working with Bluetooth is that packets can get corrupted along the way. Also – they can be accompanied by noise. And this is not one case in a million, such phenomena can occur quite often, and they need to be handled. And bluetooth can disconnect, or not connect, or pretend to connect, but in fact we know that doesn’t mean anything…

As an example of a solution to these problems, let’s design a micro-framework for processing events that are deterministic by type using a header (the first N bytes) and validated using some simple checksum. In order not to overload the code, let’s assume that the protocol’s header has a fixed size. We will split all packets into two types: fixed-length and dynamic byte-length.

Designing

Let’s start by describing the possible events in the application. So, the general abstraction will look something like this, subject to the accepted constraints :

sealed class Event {val headSize: Int = 2abstract val head: ByteArrayabstract fun isCorrupted(): Boolean//To be continued}

Next, once we have defined constant property sets for all packages, we need to somehow formalize the conditions under which we :

  1. Let’s consider that the package belongs to some type
  2. Should add a byte to the buffer because the packet is not yet built
  3. Should kill the buffer if some conditions have not been met to build it (this point is more of a safety measure, it is better to add logging during testing to check if the other conditions are met)
  4. Try to build a package from the buffer and check its validity

These four conditions lead us to the following interface :

interface EventMatcher {val headSize: Intfun matches(packet: ByteBuffer): Booleanfun create(packet: ByteBuffer): Eventfun shouldBuffer(packet: ByteBuffer): Booleanfun shouldDrop(packet: ByteBuffer): Boolean}

Let’s create a component that will provide would say handy, but I’ll leave that up to you proxy interface to our matchers for all existing types, nothing outstanding, code is under the cat :

Proxy matcher

class EventMatchersAdapter {private val matchers = mutableMapOf<KClass<out Event> , EventMatcher> ()fun register(event: KClass<out Event> , matcher: EventMatcher)= apply { matchers.put(event, matcher) }fun unregister(event: KClass<out Event> )= apply { matchers.remove(event) }fun knownEvents(): List<KClass<out Event> >= matchers.keys.toList()fun matches(packet: ByteBuffer, event: KClass<out Event> ): Boolean= matchers[event]?.matches(packet) ?: falsefun shouldBuffer(packet: ByteBuffer, event: KClass<out Event> ): Boolean= matchers[event]?.shouldBuffer(packet) ?: falsefun shouldDrop(packet: ByteBuffer, event: KClass<out Event> ): Boolean= matchers[event]?.shouldDrop(packet) ?: falsefun create(packet: ByteBuffer, event: KClass<out Event> ): Event?= matchers[event]?.create(packet)}

In packets, let’s describe a way to determine whether a given packet has been corrupted or not. It’s a pretty convenient approach which doesn’t have to suffer too much from a poorly designed protocol where the engineer thought of throwing you a hundred ways to check packets for correctness, several of each.

An example of a fixed length package

data class A(override val head: ByteArray, val payload: ByteArray, val checksum: Byte): Event() {companion object {//(two bytes of head) + (2 bytes of payload) + (byte of checksum)@JvmStatic val length = 5.toByte()@JvmStatic val headValue = byteArrayOf(0x00, 0x00)@JvmStatic val matcherValue = object: EventMatcher {override val headSize: Int = 2override fun matches(packet: ByteBuffer): Boolean {if(packet.position() == 0) return trueif(packet.position() == 1) return packet[0] == headValue[0]return packet[0] == headValue[0] packet[1] == headValue[1]}override fun create(packet: ByteBuffer): A {packet.rewind()return A(ByteArray(2, { packet.get() }), ByteArray(2, { packet.get() }), packet.get())}override fun shouldBuffer(packet: ByteBuffer): Boolean= packet.position() < lengthoverride fun shouldDrop(packet: ByteBuffer): Boolean= packet.position() > length}}override fun isCorrupted(): Boolean = checksumOf(payload) != checksumoverride fun equals(other: Any?): Boolean {if(other as? A == null) return falseother as Areturn Arrays.equals(head, other.head) Arrays.equals(payload, other.payload) checksum == other.checksum}override fun hashCode(): Int {var result = Arrays.hashCode(head)result = result * 31 + Arrays.hashCode(payload)result = result * 31 + checksum.hashCode()return result}}

An example of a package with dynamic length

data class C(override val head: ByteArray, val length: Byte, val payload: ByteArray, val checksum: Byte): Event() {companion object {@JvmStatic val headValue = byteArrayOf(0x01, 0x00)@JvmStatic val matcherValue = object: EventMatcher {override val headSize: Int = 2override fun matches(packet: ByteBuffer): Boolean {if(packet.position() == 0) return trueif(packet.position() == 1) return packet[0] == headValue[0]return packet[0] == headValue[0] packet[1] == headValue[1]}override fun create(packet: ByteBuffer): C {packet.rewind()val msb = packet.get()val lsb = packet.get()val length = packet.get()return C(byteArrayOf(msb, lsb), length, packet.take(3, length.toPositiveInt()), packet.get())}override fun shouldBuffer(packet: ByteBuffer): Boolean= when(packet.position()) {in 0..2 -> trueelse -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)}override fun shouldDrop(packet: ByteBuffer): Boolean= when(packet.position()) {in 0..2 -> falseelse -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)}}}override fun isCorrupted(): Boolean = checksumOf(payload) != checksumoverride fun equals(other: Any?): Boolean {if(other as? C == null) return falseother as Creturn Arrays.equals(head, other.head) length == other.length Arrays.equals(payload, other.payload) checksum == other.checksum}override fun hashCode(): Int {var result = Arrays.hashCode(head)result = result * 31 + length.hashCode()result = result * 31 + Arrays.hashCode(payload)result = result * 31 + checksum.hashCode()return result}}

Next, we need to describe the packet reading algorithm itself, and one that will :

  1. Supports several different types
  2. Handle packet damage for us
  3. Will be friends with Flowable

Implementation of the algorithm hidden behind the Subscriber interface :

class EventsBridge(private val adapter: EventMatchersAdapter, private val emitter: FlowableEmitter<Event> , private val bufferSize: Int = 128): DisposableSubscriber<Byte> () {private val buffers: Map<KClass<out Event> , ByteBuffer>= mutableMapOf<KClass<out Event> , ByteBuffer> ().apply {for(knownEvent in adapter.knownEvents()) {put(knownEvent, ByteBuffer.allocateDirect(bufferSize))}}.toMap()override fun onError(t: Throwable) {emitter.onError(t)}override fun onComplete() {emitter.onComplete()}override fun onNext(t: Byte) {for((key, value) in buffers) {value.put(t)adapter.knownEvents().filter { it == key }.forEach {if (adapter.matches(value, it)) {when {adapter.shouldDrop(value, it) -> {value.clear()}!adapter.shouldBuffer(value, it) -> {val event = adapter.create(value, it)if (!emitter.isCancelled event != null !event.isCorrupted()) {release()emitter.onNext(event)}else {value.clear()}}}}else {value.clear()}}}}private fun release() {for(buffer in buffers) buffer.value.clear()}}

Use

Consider running unit tests as an example :

A simple test to work for one type of package

@Testfun test_single_fixedLength() {val adapter = EventMatchersAdapter().register(Event.A::class, Event.A.matcherValue)val packetA = generateCorrectPacketA()val testSubscriber = TestSubscriber<Event> ()Flowable.create<Event> ({ emitter ->val bridge = EventsBridge(adapter, emitter)Flowable.create<Byte> ({ byteEmitter ->for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge)}, BackpressureStrategy.BUFFER).subscribe(testSubscriber)testSubscriber.assertNoErrors()testSubscriber.assertValue { event ->event is Event.A !event.isCorrupted()}}

Test with lots of noise, multiple packet types

@Testfun test_multiple_dynamicLength_mixed_withNoise() {val adapter = EventMatchersAdapter().register(Event.C::class, Event.C.matcherValue).register(Event.D::class, Event.D.matcherValue)val packetC1 = generateCorrectPacketC()val packetD1 = generateCorrectPacketD()val packetD2 = generateCorruptedPacketD()val packetC2 = generateCorruptedPacketC()val testSubscriber = TestSubscriber<Event> ()val random = Random()Flowable.create<Event> ({ emitter ->val bridge = EventsBridge(adapter, emitter)Flowable.create<Byte> ({ byteEmitter -> for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }for(byte in packetC1) { byteEmitter.onNext(byte) }for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }for(byte in packetD1) { byteEmitter.onNext(byte) }for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }for(byte in packetD2) { byteEmitter.onNext(byte) }for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }for(byte in packetC2) { byteEmitter.onNext(byte) }for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }}, BackpressureStrategy.BUFFER).subscribe(bridge)}, BackpressureStrategy.BUFFER).subscribe(testSubscriber)testSubscriber.assertNoErrors()testSubscriber.assertValueCount(2)}

Generating packages for tests

private fun generateCorrectPacketB(): ByteArray {val rnd = Random()val payload = byteArrayOf(rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte())return byteArrayOf(Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], checksumOf(payload))}private fun generateCorrectPacketC(): ByteArray {val rnd = Random()val payload = List(rnd.nextInt(16), { index ->rnd.nextInt().toByte()}).toByteArray()return ByteArray(4 + payload.size, { index ->when(index) {0 -> Event.C.headValue[0]1 -> Event.C.headValue[1]2 -> payload.size.toByte()in 3..(4 + payload.size - 2) -> payload[index - 3]4 + payload.size - 1 -> checksumOf(payload)else -> 0.toByte()}})}private fun generateCorruptedPacketB(): ByteArray {val rnd = Random()val payload = byteArrayOf(rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte())return byteArrayOf(Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], (checksumOf(payload) + 1.toByte()).toByte())}private fun generateCorruptedPacketC(): ByteArray {val rnd = Random()val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray()return ByteArray(4 + payload.size, { index ->when(index) {0 -> Event.C.headValue[0]1 -> Event.C.headValue[1]2 -> payload.size.toByte()in 3..(4 + payload.size - 2) -> payload[index - 3]else -> (checksumOf(payload) + 1.toByte()).toByte()}})}

A simple chexumma used for testing

inline fun checksumOf(data: ByteArray): Byte {var result = 0x00.toByte()for(b in data) {result = (result + b).toByte()}return (result.inv() + 1.toByte()).toByte()}

And what was the point of all this?

With this example, I would like to show how easy it is to maintain modularity in handling almost arbitrary events, by the way, not necessarily coming from a Bluetooth source (no Bluetooth-dependent code yet), while avoiding possible packet corruption and link noise.

So what’s next?

Let’s make a little wrapper over RxBluetooth that will allow us to work with different connections in a reactive style, listening to different sets of events.

The entire code can be roughly divided into three sets of components: two services and one repository.
The services will provide connection and work with connection data, respectively, while the repository will provide an abstraction for working with specific connections and act as an implicit connection flyweight.

The interfaces will roughly be as follows :

interface ConnectivityService {fun sub(service: UUID): Observable<DataService>}interface DataService {fun sub(): Flowable<Event>fun write(data: ByteArray): Booleanfun dispose()}interface DataRepository {fun sub(serviceUUID: UUID): Flowable<Event>fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>fun dispose()}

And, accordingly, the realizations under the cat.

ConnectivityServiceImpl

class ConnectivityServiceImpl(private val bluetooth: RxBluetooth, private val events: EventMatchersAdapter, private val timeoutSeconds: Long = 15L): ConnectivityService {override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled bluetooth.isBluetoothAvailable) {false -> Observable.empty()else -> {ensureBluetoothNotDiscovering()bluetooth.startDiscovery()bluetooth.observeDevices().filter { device -> device.uuids.contains(ParcelUuid(service)) }.timeout(timeoutSeconds, TimeUnit.SECONDS).take(1).doOnNext { _ -> ensureBluetoothNotDiscovering() }.doOnError { _ -> ensureBluetoothNotDiscovering() }.doOnComplete { -> ensureBluetoothNotDiscovering() }.flatMap { device -> bluetooth.observeConnectDevice(device, service) }.map { connection -> DataServiceImpl(BluetoothConnection(connection), events) }}}private fun ensureBluetoothNotDiscovering() {if(bluetooth.isDiscovering) {bluetooth.cancelDiscovery()}}}

DataServiceImpl

class DataServiceImpl constructor(private val connection: BluetoothConnection, private val adapter: EventMatchersAdapter): DataService {override fun sub(): Flowable<Event> = Flowable.create<Event> ({ emitter ->val underlying = EventsBridge(adapter = adapter, emitter = emitter)emitter.setDisposable(object: MainThreadDisposable() {override fun onDispose() {if(!underlying.isDisposed) {underlying.dispose()}}})connection.observeByteStream().subscribe(underlying)}, BackpressureStrategy.BUFFER)override fun write(data: ByteArray): Boolean= connection.send(data)override fun dispose()= connection.closeConnection()}

DataRepositoryImpl

class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository {private val services = ConcurrentHashMap<UUID, DataService> ()override fun sub(serviceUUID: UUID): Flowable<Event>= serviceOf(serviceUUID).flatMap { service -> service.sub() }override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>= serviceOf(serviceUUID).map { service -> service.write(data) }override fun dispose() {for((_, service) in services) {service.dispose()}}private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) {when(this) {null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER)else -> Flowable.just(this)}}}

And so, in a minimal number of lines, we get to do what would normally be stretched into creepy call chains, or callback hulls about the following :

repository.sub(UUID.randomUUID()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe { event ->when(event) {is Event.A -> doSomeStuffA(event)is Event.B -> doSomeStuffB(event)is Event.C -> doSomeStuffC(event)is Event.D -> doSomeStuffD(event)}}

11 lines for listening to four events from an arbitrary device, not bad, is it?)

In lieu of a conclusion

If anyone reading this has a desire to look at the sources, they are are here

If anyone wants to see how other rules for generating packets from raw bytes would fit in, let me know and we’ll try to add them.

UPD: formatted in microframe with optin bridges in ReactiveX, coroutines, and a clean Kotlin implementation.

You may also like