dfu_transport_serial.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # Copyright (c) 2015, Nordic Semiconductor
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are met:
  6. #
  7. # * Redistributions of source code must retain the above copyright notice, this
  8. # list of conditions and the following disclaimer.
  9. #
  10. # * Redistributions in binary form must reproduce the above copyright notice,
  11. # this list of conditions and the following disclaimer in the documentation
  12. # and/or other materials provided with the distribution.
  13. #
  14. # * Neither the name of Nordic Semiconductor ASA nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  19. # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  20. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
  22. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  23. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  24. # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  25. # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
  26. # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  27. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. # Python imports
  29. import time
  30. from datetime import datetime, timedelta
  31. import binascii
  32. import logging
  33. # Python 3rd party imports
  34. from serial import Serial
  35. # Nordic Semiconductor imports
  36. from nordicsemi.dfu.util import slip_parts_to_four_bytes, slip_encode_esc_chars, int16_to_bytes, int32_to_bytes
  37. from nordicsemi.dfu import crc16
  38. from nordicsemi.exceptions import NordicSemiException
  39. from nordicsemi.dfu.dfu_transport import DfuTransport, DfuEvent
  40. logger = logging.getLogger(__name__)
  41. class DfuTransportSerial(DfuTransport):
  42. DEFAULT_BAUD_RATE = 38400
  43. DEFAULT_FLOW_CONTROL = False
  44. DEFAULT_SERIAL_PORT_TIMEOUT = 1.0 # Timeout time on serial port read
  45. ACK_PACKET_TIMEOUT = 1.0 # Timeout time for for ACK packet received before reporting timeout through event system
  46. SEND_INIT_PACKET_WAIT_TIME = 1.0 # Time to wait before communicating with bootloader after init packet is sent
  47. SEND_START_DFU_WAIT_TIME = 10.0 # Time to wait before communicating with bootloader after start DFU packet is sent
  48. DFU_PACKET_MAX_SIZE = 512 # The DFU packet max size
  49. def __init__(self, com_port, baud_rate=DEFAULT_BAUD_RATE, flow_control=DEFAULT_FLOW_CONTROL, timeout=DEFAULT_SERIAL_PORT_TIMEOUT):
  50. super(DfuTransportSerial, self).__init__()
  51. self.com_port = com_port
  52. self.baud_rate = baud_rate
  53. self.flow_control = 1 if flow_control else 0
  54. self.timeout = timeout
  55. self.serial_port = None
  56. """:type: serial.Serial """
  57. def open(self):
  58. super(DfuTransportSerial, self).open()
  59. try:
  60. self.serial_port = Serial(port=self.com_port, baudrate=self.baud_rate, rtscts=self.flow_control, timeout=self.timeout)
  61. except Exception, e:
  62. raise NordicSemiException("Serial port could not be opened on {0}. Reason: {1}".format(self.com_port, e.message))
  63. def close(self):
  64. super(DfuTransportSerial, self).close()
  65. self.serial_port.close()
  66. def is_open(self):
  67. super(DfuTransportSerial, self).is_open()
  68. if self.serial_port is None:
  69. return False
  70. return self.serial_port.isOpen()
  71. def send_validate_firmware(self):
  72. super(DfuTransportSerial, self).send_validate_firmware()
  73. return True
  74. def send_init_packet(self, init_packet):
  75. super(DfuTransportSerial, self).send_init_packet(init_packet)
  76. frame = int32_to_bytes(DFU_INIT_PACKET)
  77. frame += init_packet
  78. frame += int16_to_bytes(0x0000) # Padding required
  79. packet = HciPacket(frame)
  80. self.send_packet(packet)
  81. time.sleep(DfuTransportSerial.SEND_INIT_PACKET_WAIT_TIME)
  82. def send_start_dfu(self, mode, softdevice_size=None, bootloader_size=None, app_size=None):
  83. super(DfuTransportSerial, self).send_start_dfu(mode, softdevice_size, bootloader_size, app_size)
  84. frame = int32_to_bytes(DFU_START_PACKET)
  85. frame += int32_to_bytes(mode)
  86. frame += DfuTransport.create_image_size_packet(softdevice_size, bootloader_size, app_size)
  87. packet = HciPacket(frame)
  88. self.send_packet(packet)
  89. time.sleep(DfuTransportSerial.SEND_START_DFU_WAIT_TIME)
  90. def send_activate_firmware(self):
  91. super(DfuTransportSerial, self).send_activate_firmware()
  92. def send_firmware(self, firmware):
  93. super(DfuTransportSerial, self).send_firmware(firmware)
  94. def progress_percentage(part, whole):
  95. return int(100 * float(part)/float(whole))
  96. frames = []
  97. self._send_event(DfuEvent.PROGRESS_EVENT, progress=0, done=False, log_message="")
  98. for i in range(0, len(firmware), DfuTransportSerial.DFU_PACKET_MAX_SIZE):
  99. data_packet = HciPacket(int32_to_bytes(DFU_DATA_PACKET) + firmware[i:i + DfuTransportSerial.DFU_PACKET_MAX_SIZE])
  100. frames.append(data_packet)
  101. frames_count = len(frames)
  102. # Send firmware packets
  103. for count, pkt in enumerate(frames):
  104. self.send_packet(pkt)
  105. self._send_event(DfuEvent.PROGRESS_EVENT,
  106. log_message="",
  107. progress=progress_percentage(count, frames_count),
  108. done=False)
  109. # Send data stop packet
  110. frame = int32_to_bytes(DFU_STOP_DATA_PACKET)
  111. packet = HciPacket(frame)
  112. self.send_packet(packet)
  113. self._send_event(DfuEvent.PROGRESS_EVENT, progress=100, done=False, log_message="")
  114. def send_packet(self, pkt):
  115. attempts = 0
  116. last_ack = None
  117. packet_sent = False
  118. logger.debug("PC -> target: {0}".format(pkt))
  119. while not packet_sent:
  120. self.serial_port.write(pkt.data)
  121. attempts += 1
  122. ack = self.get_ack_nr()
  123. if last_ack is None:
  124. break
  125. if ack == (last_ack + 1) % 8:
  126. last_ack = ack
  127. packet_sent = True
  128. if attempts > 3:
  129. raise Exception("Three failed tx attempts encountered on packet {0}".format(pkt.sequence_number))
  130. def get_ack_nr(self):
  131. def is_timeout(start_time, timeout_sec):
  132. return not (datetime.now() - start_time <= timedelta(0, timeout_sec))
  133. uart_buffer = ''
  134. start = datetime.now()
  135. while uart_buffer.count('\xC0') < 2:
  136. # Disregard first of the two C0
  137. temp = self.serial_port.read(6)
  138. if temp:
  139. uart_buffer += temp
  140. if is_timeout(start, DfuTransportSerial.ACK_PACKET_TIMEOUT):
  141. # reset HciPacket numbering back to 0
  142. HciPacket.sequence_number = 0
  143. self._send_event(DfuEvent.TIMEOUT_EVENT,
  144. log_message="Timed out waiting for acknowledgement from device.")
  145. # quit loop
  146. break
  147. # read until you get a new C0
  148. # RESUME_WORK
  149. if len(uart_buffer) < 2:
  150. raise NordicSemiException("No data received on serial port. Not able to proceed.")
  151. logger.debug("PC <- target: {0}".format(binascii.hexlify(uart_buffer)))
  152. data = self.decode_esc_chars(uart_buffer)
  153. # Remove 0xC0 at start and beginning
  154. data = data[1:-1]
  155. # Extract ACK number from header
  156. return (data[0] >> 3) & 0x07
  157. @staticmethod
  158. def decode_esc_chars(data):
  159. """Replace 0xDBDC with 0xCO and 0xDBDD with 0xDB"""
  160. result = []
  161. data = bytearray(data)
  162. while len(data):
  163. char = data.pop(0)
  164. if char == 0xDB:
  165. char2 = data.pop(0)
  166. if char2 == 0xDC:
  167. result.append(0xC0)
  168. elif char2 == 0xDD:
  169. result.append(0xDB)
  170. else:
  171. raise Exception('Char 0xDB NOT followed by 0xDC or 0xDD')
  172. else:
  173. result.append(char)
  174. return result
  175. DATA_INTEGRITY_CHECK_PRESENT = 1
  176. RELIABLE_PACKET = 1
  177. HCI_PACKET_TYPE = 14
  178. DFU_INIT_PACKET = 1
  179. DFU_START_PACKET = 3
  180. DFU_DATA_PACKET = 4
  181. DFU_STOP_DATA_PACKET = 5
  182. DFU_UPDATE_MODE_NONE = 0
  183. DFU_UPDATE_MODE_SD = 1
  184. DFU_UPDATE_MODE_BL = 2
  185. DFU_UPDATE_MODE_APP = 4
  186. class HciPacket(object):
  187. """Class representing a single HCI packet"""
  188. sequence_number = 0
  189. def __init__(self, data=''):
  190. HciPacket.sequence_number = (HciPacket.sequence_number + 1) % 8
  191. self.temp_data = ''
  192. self.temp_data += slip_parts_to_four_bytes(HciPacket.sequence_number,
  193. DATA_INTEGRITY_CHECK_PRESENT,
  194. RELIABLE_PACKET,
  195. HCI_PACKET_TYPE,
  196. len(data))
  197. self.temp_data += data
  198. # Add escape characters
  199. crc = crc16.calc_crc16(self.temp_data, crc=0xffff)
  200. self.temp_data += chr(crc & 0xFF)
  201. self.temp_data += chr((crc & 0xFF00) >> 8)
  202. self.temp_data = slip_encode_esc_chars(self.temp_data)
  203. self.data = chr(0xc0)
  204. self.data += self.temp_data
  205. self.data += chr(0xc0)
  206. def __str__(self):
  207. return binascii.hexlify(self.data)