123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # Copyright (c) 2015, Nordic Semiconductor
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # * Redistributions of source code must retain the above copyright notice, this
- # list of conditions and the following disclaimer.
- #
- # * Redistributions in binary form must reproduce the above copyright notice,
- # this list of conditions and the following disclaimer in the documentation
- # and/or other materials provided with the distribution.
- #
- # * Neither the name of Nordic Semiconductor ASA nor the names of its
- # contributors may be used to endorse or promote products derived from
- # this software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
- # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
- # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- # Python standard library
- from time import sleep
- from datetime import datetime, timedelta
- import abc
- import logging
- # Nordic libraries
- from nordicsemi.exceptions import NordicSemiException, IllegalStateException
- from nordicsemi.dfu.util import int16_to_bytes
- from nordicsemi.dfu.dfu_transport import DfuTransport, DfuEvent
- logger = logging.getLogger(__name__)
- # BLE DFU OpCodes :
- class DfuOpcodesBle(object):
- """ DFU opcodes used during DFU communication with bootloader
- See http://developer.nordicsemi.com/nRF51_SDK/doc/7.2.0/s110/html/a00949.html#gafa9a52a3e6c43ccf00cf680f944d67a3
- for further information
- """
- INVALID_OPCODE = 0
- START_DFU = 1
- INITIALIZE_DFU = 2
- RECEIVE_FIRMWARE_IMAGE = 3
- VALIDATE_FIRMWARE_IMAGE = 4
- ACTIVATE_FIRMWARE_AND_RESET = 5
- SYSTEM_RESET = 6
- REQ_PKT_RCPT_NOTIFICATION = 8
- RESPONSE = 16
- PKT_RCPT_NOTIF = 17
- class DfuErrorCodeBle(object):
- """ DFU error code used during DFU communication with bootloader
- See http://developer.nordicsemi.com/nRF51_SDK/doc/7.2.0/s110/html/a00949.html#gafa9a52a3e6c43ccf00cf680f944d67a3
- for further information
- """
- SUCCESS = 1
- INVALID_STATE = 2
- NOT_SUPPORTED = 3
- DATA_SIZE_EXCEEDS_LIMIT = 4
- CRC_ERROR = 5
- OPERATION_FAILED = 6
- @staticmethod
- def error_code_lookup(error_code):
- """
- Returns a description lookup table for error codes received from peer.
- :param int error_code: Error code to parse
- :return str: Textual description of the error code
- """
- code_lookup = {DfuErrorCodeBle.SUCCESS: "SUCCESS",
- DfuErrorCodeBle.INVALID_STATE: "Invalid State",
- DfuErrorCodeBle.NOT_SUPPORTED: "Not Supported",
- DfuErrorCodeBle.DATA_SIZE_EXCEEDS_LIMIT: "Data Size Exceeds Limit",
- DfuErrorCodeBle.CRC_ERROR: "CRC Error",
- DfuErrorCodeBle.OPERATION_FAILED: "Operation Failed"}
- return code_lookup.get(error_code, "UNKOWN ERROR CODE")
- # Service UUID. For further information, look at the nRF51 SDK documentation V7.2.0:
- # http://developer.nordicsemi.com/nRF51_SDK/doc/7.2.0/s110/html/a00071.html#ota_spec_number
- UUID_DFU_SERVICE = '000015301212EFDE1523785FEABCD123'
- # Characteristic UUID
- UUID_DFU_PACKET_CHARACTERISTIC = '000015321212EFDE1523785FEABCD123'
- UUID_DFU_CONTROL_STATE_CHARACTERISTIC = '000015311212EFDE1523785FEABCD123'
- # Descriptor UUID
- UUID_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR = 0x2902
- # NOTE: If packet receipt notification is enabled, a packet receipt
- # notification will be received for each 'num_of_packets_between_notif'
- # number of packets.
- #
- # Configuration tip: Increase this to get lesser notifications from the DFU
- # Target about packet receipts. Make it 0 to disable the packet receipt
- # notification
- NUM_OF_PACKETS_BETWEEN_NOTIF = 10
- DATA_PACKET_SIZE = 20
- class DfuTransportBle(DfuTransport):
- def __init__(self):
- super(DfuTransportBle, self).__init__()
- def open(self):
- super(DfuTransportBle, self).open()
- def is_open(self):
- return super(DfuTransportBle, self).is_open()
- def close(self):
- super(DfuTransportBle, self).close()
- def _wait_for_condition(self, condition_function, expected_condition_value=True, timeout=10,
- waiting_for="condition"):
- """
- Waits for condition_function to be true
- Will timeout after 60 seconds
- :param function condition_function: The function we are waiting for to return true
- :param str timeout_message: Message that should be logged
- :return:
- """
- start_time = datetime.now()
- while condition_function() != expected_condition_value:
- timeout_message = "Timeout while waiting for {0}.".format(waiting_for)
- timed_out = datetime.now() - start_time > timedelta(0, timeout)
- if timed_out:
- self._send_event(DfuEvent.TIMEOUT_EVENT, log_message=timeout_message)
- raise NordicSemiException(timeout_message)
- if not self.is_open():
- log_message = "Disconnected from device while waiting for {0}.".format(waiting_for)
- raise IllegalStateException(log_message)
- sleep(0.1)
- if self.get_last_error() != DfuErrorCodeBle.SUCCESS:
- error_message = "Error occoured while waiting for {0}. Error response {1}."
- error_code = DfuErrorCodeBle.error_code_lookup(self.get_last_error())
- error_message = error_message.format(waiting_for, error_code)
- self._send_event(DfuEvent.ERROR_EVENT, log_message=error_message)
- raise NordicSemiException(error_message)
- @abc.abstractmethod
- def send_packet_data(self, data):
- """
- Send data to the packet characteristic
- :param str data: The data to be sent
- :return:
- """
- pass
- @abc.abstractmethod
- def send_control_data(self, opcode, data=""):
- """
- Send data to the control characteristic
- :param int opcode: The opcode for the operation command sent to the control characteristic
- :param str data: The data to be sent
- :return:
- """
- pass
- @abc.abstractmethod
- def get_received_response(self):
- """
- Returns True if the transport layer has received a response it expected
- :return: bool
- """
- pass
- def clear_received_response(self):
- """
- Clears the received response status, sets it to False.
- :return:
- """
- pass
- @abc.abstractmethod
- def is_waiting_for_notification(self):
- """
- Returns True if the transport layer is waiting for a notification
- :return: bool
- """
- pass
- def set_waiting_for_notification(self):
- """
- Notifies the transport layer that it should wait for notification
- :return:
- """
- pass
- @abc.abstractmethod
- def get_last_error(self):
- """
- Returns the last error code
- :return: DfuErrorCodeBle
- """
- pass
- def _start_dfu(self, program_mode, image_size_packet):
- logger.debug("Sending 'START DFU' command")
- self.send_control_data(DfuOpcodesBle.START_DFU, chr(program_mode))
- logger.debug("Sending image size")
- self.send_packet_data(image_size_packet)
- self._wait_for_condition(self.get_received_response, waiting_for="response for START DFU")
- self.clear_received_response()
- def send_start_dfu(self, program_mode, softdevice_size=0, bootloader_size=0, app_size=0):
- super(DfuTransportBle, self).send_start_dfu(program_mode, softdevice_size, bootloader_size, app_size)
- image_size_packet = DfuTransport.create_image_size_packet(softdevice_size, bootloader_size, app_size)
- self._send_event(DfuEvent.PROGRESS_EVENT, progress=0, log_message="Setting up transfer...")
- try:
- self._start_dfu(program_mode, image_size_packet)
- except IllegalStateException:
- # We got disconnected. Try to send Start DFU again in case of buttonless dfu.
- self.close()
- self.open()
- if not self.is_open():
- raise IllegalStateException("Failed to reopen transport backend.")
- self._start_dfu(program_mode, image_size_packet)
- def send_init_packet(self, init_packet):
- super(DfuTransportBle, self).send_init_packet(init_packet)
- init_packet_start = chr(0x00)
- init_packet_end = chr(0x01)
- logger.debug("Sending 'INIT DFU' command")
- self.send_control_data(DfuOpcodesBle.INITIALIZE_DFU, init_packet_start)
- logger.debug("Sending init data")
- for i in range(0, len(init_packet), DATA_PACKET_SIZE):
- data_to_send = init_packet[i:i + DATA_PACKET_SIZE]
- self.send_packet_data(data_to_send)
- logger.debug("Sending 'Init Packet Complete' command")
- self.send_control_data(DfuOpcodesBle.INITIALIZE_DFU, init_packet_end)
- self._wait_for_condition(self.get_received_response, timeout=60, waiting_for="response for INITIALIZE DFU")
- self.clear_received_response()
- if NUM_OF_PACKETS_BETWEEN_NOTIF:
- packet = int16_to_bytes(NUM_OF_PACKETS_BETWEEN_NOTIF)
- logger.debug("Send number of packets before device sends notification")
- self.send_control_data(DfuOpcodesBle.REQ_PKT_RCPT_NOTIFICATION, packet)
- def send_firmware(self, firmware):
- def progress_percentage(part, complete):
- """
- Calculate progress percentage
- :param int part: Part value
- :param int complete: Completed value
- :return: int: Percentage complete
- """
- return min(100, (part + DATA_PACKET_SIZE) * 100 / complete)
- super(DfuTransportBle, self).send_firmware(firmware)
- packets_sent = 0
- last_progress_update = -1 # Last packet sequence number when an update was fired to the event system
- bin_size = len(firmware)
- logger.debug("Send 'RECEIVE FIRMWARE IMAGE' command")
- self.send_control_data(DfuOpcodesBle.RECEIVE_FIRMWARE_IMAGE)
- for i in range(0, bin_size, DATA_PACKET_SIZE):
- progress = progress_percentage(i, bin_size)
- if progress != last_progress_update:
- self._send_event(DfuEvent.PROGRESS_EVENT, progress=progress, log_message="Uploading firmware")
- last_progress_update = progress
- self._wait_for_condition(self.is_waiting_for_notification, expected_condition_value=False,
- waiting_for="notification from device")
- data_to_send = firmware[i:i + DATA_PACKET_SIZE]
- log_message = "Sending Firmware bytes [{0}, {1}]".format(i, i + len(data_to_send))
- logger.debug(log_message)
- packets_sent += 1
- if NUM_OF_PACKETS_BETWEEN_NOTIF != 0:
- if (packets_sent % NUM_OF_PACKETS_BETWEEN_NOTIF) == 0:
- self.set_waiting_for_notification()
- self.send_packet_data(data_to_send)
- self._wait_for_condition(self.get_received_response, waiting_for="response for RECEIVE FIRMWARE IMAGE")
- self.clear_received_response()
- def send_validate_firmware(self):
- super(DfuTransportBle, self).send_validate_firmware()
- logger.debug("Sending 'VALIDATE FIRMWARE IMAGE' command")
- self.send_control_data(DfuOpcodesBle.VALIDATE_FIRMWARE_IMAGE)
- self._wait_for_condition(self.get_received_response, waiting_for="response for VALIDATE FIRMWARE IMAGE")
- self.clear_received_response()
- logger.info("Firmware validated OK.")
- def send_activate_firmware(self):
- super(DfuTransportBle, self).send_activate_firmware()
- logger.debug("Sending 'ACTIVATE FIRMWARE AND RESET' command")
- self.send_control_data(DfuOpcodesBle.ACTIVATE_FIRMWARE_AND_RESET)
|