[core] Migrate tools to ltchiptool

This commit is contained in:
Kuba Szczodrzyński
2022-08-03 13:22:03 +02:00
parent d8c0105b97
commit 1e3a8971fb
44 changed files with 117 additions and 3907 deletions

View File

@@ -14,8 +14,7 @@
### Tools
- move all UF2 assembling/uploading/processing tools (as well as `uf2ota` C library) to a separate repository, possibly rewriting parts of it again. Make these tools CLI-usable
- write OpenOCD flashers, using uf2ota library + FAL for partitions (same repo as above)
- write OpenOCD flashers, using uf2ota library + FAL for partitions (in ltchiptool repository)
### Serial
@@ -31,10 +30,8 @@
## BK7231
- WiFi events
- implement OTA
- fix WiFi on BK7231N, test other functionality
- add generic board definition
- fix SSL (mbedTLS)
- I2C (Wire)
- SPI
@@ -42,6 +39,5 @@
## RTL8710B
- add generic board definition
- move to GNU++11 (and verify that it works) - take all stdio functions from stdio.h
- rewrite most of Wiring (it was copied from `ambd_arduino`, and is ugly)

View File

@@ -2,8 +2,7 @@
#include <Arduino.h>
#include <functional>
#include "uf2ota/uf2ota.h"
#include <uf2ota/uf2ota.h>
// No Error
#define UPDATE_ERROR_OK (0)

View File

@@ -1,32 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */
#include "uf2priv.h"
uf2_err_t uf2_binpatch(uint8_t *data, const uint8_t *binpatch, uint8_t binpatch_len) {
const uint8_t *binpatch_end = binpatch + binpatch_len;
// +2 to make sure opcode and length is present
while ((binpatch + 2) < binpatch_end) {
uf2_opcode_t opcode = binpatch[0];
uint8_t len = binpatch[1];
switch (opcode) {
case UF2_OPC_DIFF32:
uf2_binpatch_diff32(data, binpatch + 1);
break;
}
// advance by opcode + length + data
binpatch += len + 2;
}
return UF2_ERR_OK;
}
void uf2_binpatch_diff32(uint8_t *data, const uint8_t *patch) {
uint8_t num_offs = patch[0] - 4; // read offset count
uint32_t diff = *((uint32_t *)(patch + 1)); // read diff value
patch += 5; // skip num_offs and diff value
for (uint8_t i = 0; i < num_offs; i++) {
// patch the data
uint8_t offs = patch[i];
uint32_t *value = (uint32_t *)(data + offs);
*(value) += diff;
}
}

View File

@@ -1,26 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */
#pragma once
#include "uf2types.h"
/**
* @brief Apply binary patch to data.
*
* @param data input data
* @param data_len input data length
* @param binpatch binary patch data
* @param binpatch_len binary patch data length
* @return uf2_err_t error code
*/
uf2_err_t uf2_binpatch(uint8_t *data, const uint8_t *binpatch, uint8_t binpatch_len);
/**
* Apply DIFF32 binary patch.
*
* @param data input data
* @param len input data length
* @param patch patch data, incl. length byte
* @return uf2_err_t error code
*/
void uf2_binpatch_diff32(uint8_t *data, const uint8_t *patch);

View File

@@ -1,100 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */
#include "uf2priv.h"
uf2_ota_t *uf2_ctx_init(uint8_t ota_idx, uint32_t family_id) {
uf2_ota_t *ctx = (uf2_ota_t *)zalloc(sizeof(uf2_ota_t));
ctx->ota_idx = ota_idx;
ctx->family_id = family_id;
return ctx;
}
uf2_info_t *uf2_info_init() {
uf2_info_t *info = (uf2_info_t *)zalloc(sizeof(uf2_info_t));
return info;
}
void uf2_info_free(uf2_info_t *info) {
if (!info)
return;
free(info->fw_name);
free(info->fw_version);
free(info->lt_version);
free(info->board);
free(info);
}
uf2_err_t uf2_check_block(uf2_ota_t *ctx, uf2_block_t *block) {
if (block->magic1 != UF2_MAGIC_1)
return UF2_ERR_MAGIC;
if (block->magic2 != UF2_MAGIC_2)
return UF2_ERR_MAGIC;
if (block->magic3 != UF2_MAGIC_3)
return UF2_ERR_MAGIC;
if (block->file_container)
// ignore file containers, for now
return UF2_ERR_IGNORE;
if (!block->has_family_id || block->file_size != ctx->family_id)
// require family_id
return UF2_ERR_FAMILY;
return UF2_ERR_OK;
}
uf2_err_t uf2_parse_header(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info) {
if (!block->has_tags || block->file_container || block->len)
// header must have tags and no data
return UF2_ERR_NOT_HEADER;
uf2_err_t err = uf2_parse_block(ctx, block, info);
if (err)
return err;
if ((ctx->ota_idx == 1 && !ctx->has_ota1) || !ctx->has_ota2)
return UF2_ERR_OTA_WRONG;
return UF2_ERR_OK;
}
uf2_err_t uf2_write(uf2_ota_t *ctx, uf2_block_t *block) {
if (ctx->seq == 0)
return uf2_parse_header(ctx, block, NULL);
if (block->not_main_flash || !block->len)
// ignore blocks not meant for flashing
return UF2_ERR_IGNORE;
uf2_err_t err = uf2_parse_block(ctx, block, NULL);
if (err)
return err;
if (!ctx->part1 && !ctx->part2)
// no partitions set at all
return UF2_ERR_PART_UNSET;
fal_partition_t part = uf2_get_target_part(ctx);
if (!part)
// image is not for current OTA scheme
return UF2_ERR_IGNORE;
if (ctx->ota_idx == 2 && ctx->binpatch_len) {
// apply binpatch
err = uf2_binpatch(block->data, ctx->binpatch, ctx->binpatch_len);
if (err)
return err;
}
int ret;
// erase sectors if needed
if (!uf2_is_erased(ctx, block->addr, block->len)) {
ret = fal_partition_erase(part, block->addr, block->len);
if (ret < 0)
return UF2_ERR_ERASE_FAILED;
ctx->erased_offset = block->addr;
ctx->erased_length = ret;
}
// write data to flash
ret = fal_partition_write(part, block->addr, block->data, block->len);
if (ret < 0)
return UF2_ERR_WRITE_FAILED;
if (ret != block->len)
return UF2_ERR_WRITE_LENGTH;
return UF2_ERR_OK;
}

View File

@@ -1,68 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */
#pragma once
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
#include "uf2types.h"
/**
* @brief Create an UF2 OTA context.
*
* @param ota_idx target OTA index
* @param family_id expected family ID
* @return uf2_ota_t* heap-allocated structure
*/
uf2_ota_t *uf2_ctx_init(uint8_t ota_idx, uint32_t family_id);
/**
* @brief Create an UF2 Info structure.
*
* @return uf2_info_t* heap-allocated structure
*/
uf2_info_t *uf2_info_init();
/**
* @brief Free values in the info structure AND the structure itself.
*
* @param info structure to free; may be NULL
*/
void uf2_info_free(uf2_info_t *info);
/**
* @brief Check if block is valid.
*
* @param ctx context
* @param block block to check
* @return uf2_err_t error code; UF2_ERR_OK and UF2_ERR_IGNORE denote valid blocks
*/
uf2_err_t uf2_check_block(uf2_ota_t *ctx, uf2_block_t *block);
/**
* @brief Parse header block (LibreTuya UF2 first block).
*
* Note: caller should call uf2_check_block() first.
*
* @param ctx context
* @param block block to parse
* @param info structure to write firmware info, NULL if not used
* @return uf2_err_t error code
*/
uf2_err_t uf2_parse_header(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info);
/**
* @brief Write the block to flash memory.
*
* Note: caller should call uf2_check_block() first.
*
* @param ctx context
* @param block block to write
* @return uf2_err_t error code
*/
uf2_err_t uf2_write(uf2_ota_t *ctx, uf2_block_t *block);
#ifdef __cplusplus
} // extern "C"
#endif

View File

@@ -1,146 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-29. */
#include "uf2priv.h"
uf2_err_t uf2_parse_block(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info) {
if (block->block_seq != ctx->seq)
// sequence number must match
return UF2_ERR_SEQ_MISMATCH;
ctx->seq++; // increment sequence number after checking it
if (!block->has_tags)
// no tags in this block, no further processing needed
return UF2_ERR_OK;
if (block->len > (476 - 4 - 4))
// at least one tag + last tag must fit
return UF2_ERR_DATA_TOO_LONG;
uint8_t *tags_start = block->data + block->len;
uint8_t tags_len = 476 - block->len;
uint8_t tags_pos = 0;
if (block->has_md5)
tags_len -= 24;
ctx->binpatch_len = 0; // binpatch applies to one block only
char *part1 = NULL;
char *part2 = NULL;
uf2_tag_type_t type;
while (tags_pos < tags_len) {
uint8_t len = uf2_read_tag(tags_start + tags_pos, &type);
if (!len)
break;
tags_pos += 4; // skip tag header
uint8_t *tag = tags_start + tags_pos;
char **str_dest = NULL; // char* to copy the tag into
switch (type) {
case UF2_TAG_OTA_VERSION:
if (tag[0] != 1)
return UF2_ERR_OTA_VER;
break;
case UF2_TAG_FIRMWARE:
if (info)
str_dest = &(info->fw_name);
break;
case UF2_TAG_VERSION:
if (info)
str_dest = &(info->fw_version);
break;
case UF2_TAG_LT_VERSION:
if (info)
str_dest = &(info->lt_version);
break;
case UF2_TAG_BOARD:
if (info)
str_dest = &(info->board);
break;
case UF2_TAG_HAS_OTA1:
ctx->has_ota1 = tag[0];
break;
case UF2_TAG_HAS_OTA2:
ctx->has_ota2 = tag[0];
break;
case UF2_TAG_PART_1:
str_dest = &(part1);
break;
case UF2_TAG_PART_2:
str_dest = &(part2);
break;
case UF2_TAG_BINPATCH:
ctx->binpatch = tag;
ctx->binpatch_len = len;
break;
default:
break;
}
if (str_dest) {
*str_dest = (char *)zalloc(len + 1);
memcpy(*str_dest, tag, len);
}
// align position to 4 bytes
tags_pos += (((len - 1) / 4) + 1) * 4;
}
if (part1 && part2) {
// update current target partition
uf2_err_t err = uf2_update_parts(ctx, part1, part2);
if (err)
return err;
} else if (part1 || part2) {
// only none or both partitions can be specified
return UF2_ERR_PART_ONE;
}
return UF2_ERR_OK;
}
uint8_t uf2_read_tag(const uint8_t *data, uf2_tag_type_t *type) {
uint8_t len = data[0];
if (!len)
return 0;
uint32_t tag_type = *((uint32_t *)data);
if (!tag_type)
return 0;
*type = tag_type >> 8; // remove tag length byte
return len - 4;
}
uf2_err_t uf2_update_parts(uf2_ota_t *ctx, char *part1, char *part2) {
// reset both target partitions
ctx->part1 = NULL;
ctx->part2 = NULL;
// reset offsets as they probably don't apply to this partition
ctx->erased_offset = 0;
ctx->erased_length = 0;
if (part1[0]) {
ctx->part1 = (fal_partition_t)fal_partition_find(part1);
if (!ctx->part1)
return UF2_ERR_PART_404;
}
if (part2[0]) {
ctx->part2 = (fal_partition_t)fal_partition_find(part2);
if (!ctx->part2)
return UF2_ERR_PART_404;
}
return UF2_ERR_OK;
}
fal_partition_t uf2_get_target_part(uf2_ota_t *ctx) {
if (ctx->ota_idx == 1)
return ctx->part1;
if (ctx->ota_idx == 2)
return ctx->part2;
return NULL;
}
bool uf2_is_erased(uf2_ota_t *ctx, uint32_t offset, uint32_t length) {
uint32_t erased_end = ctx->erased_offset + ctx->erased_length;
uint32_t end = offset + length;
return (offset >= ctx->erased_offset) && (end <= erased_end);
}

View File

@@ -1,61 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */
#pragma once
// include family stdlib APIs
#include <WVariant.h>
#include "uf2binpatch.h"
#include "uf2types.h"
/**
* @brief Parse a block and extract information from tags.
*
* @param ctx context
* @param block block to parse
* @param info structure to write firmware info, NULL if not used
* @return uf2_err_t error code
*/
uf2_err_t uf2_parse_block(uf2_ota_t *ctx, uf2_block_t *block, uf2_info_t *info);
/**
* @brief Parse a tag.
*
* @param data pointer to tag header beginning
* @param type [out] parsed tag type
* @return uint8_t parsed tag data length (excl. header); 0 if invalid/last tag
*/
uint8_t uf2_read_tag(const uint8_t *data, uf2_tag_type_t *type);
/**
* @brief Update destination partitions in context.
*
* Partition names cannot be NULL.
*
* Returns UF2_ERR_IGNORE if specified partitions don't match the
* current OTA index.
*
* @param ctx context
* @param part1 partition 1 name or empty string
* @param part2 partition 2 name or empty string
* @return uf2_err_t error code
*/
uf2_err_t uf2_update_parts(uf2_ota_t *ctx, char *part1, char *part2);
/**
* @brief Get target flashing partition, depending on OTA index.
*
* @param ctx context
* @return fal_partition_t target partition or NULL if not set
*/
fal_partition_t uf2_get_target_part(uf2_ota_t *ctx);
/**
* Check if specified flash memory region was already erased during update.
*
* @param ctx context
* @param offset offset to check
* @param length length to check
* @return bool true/false
*/
bool uf2_is_erased(uf2_ota_t *ctx, uint32_t offset, uint32_t length);

View File

@@ -1,104 +0,0 @@
/* Copyright (c) Kuba Szczodrzyński 2022-05-28. */
#pragma once
#include <stdbool.h>
#include <stdint.h>
#include <fal.h>
#define UF2_MAGIC_1 0x0A324655
#define UF2_MAGIC_2 0x9E5D5157
#define UF2_MAGIC_3 0x0AB16F30
#define UF2_BLOCK_SIZE sizeof(uf2_block_t)
typedef struct __attribute__((packed)) {
// 32 byte header
uint32_t magic1;
uint32_t magic2;
// flags split as bitfields
bool not_main_flash : 1;
uint16_t dummy1 : 11;
bool file_container : 1;
bool has_family_id : 1;
bool has_md5 : 1;
bool has_tags : 1;
uint16_t dummy2 : 16;
uint32_t addr;
uint32_t len;
uint32_t block_seq;
uint32_t block_count;
uint32_t file_size; // or familyID;
uint8_t data[476];
uint32_t magic3;
} uf2_block_t;
typedef struct {
uint32_t seq; // current block sequence number
uint8_t *binpatch; // current block's binpatch (if any) -> pointer inside block->data
uint8_t binpatch_len; // binpatch length
bool has_ota1; // image has any data for OTA1
bool has_ota2; // image has any data for OTA2
uint8_t ota_idx; // target OTA index
uint32_t family_id; // expected family ID
uint32_t erased_offset; // offset of region erased during update
uint32_t erased_length; // length of erased region
fal_partition_t part1; // OTA1 target partition
fal_partition_t part2; // OTA2 target partition
} uf2_ota_t;
typedef struct {
char *fw_name;
char *fw_version;
char *lt_version;
char *board;
} uf2_info_t;
typedef enum {
UF2_TAG_VERSION = 0x9FC7BC, // version of firmware file - UTF8 semver string
UF2_TAG_PAGE_SIZE = 0x0BE9F7, // page size of target device (32 bit unsigned number)
UF2_TAG_SHA2 = 0xB46DB0, // SHA-2 checksum of firmware (can be of various size)
UF2_TAG_DEVICE = 0x650D9D, // description of device (UTF8)
UF2_TAG_DEVICE_ID = 0xC8A729, // device type identifier
// LibreTuya custom, tags
UF2_TAG_OTA_VERSION = 0x5D57D0, // format version
UF2_TAG_BOARD = 0xCA25C8, // board name (lowercase code)
UF2_TAG_FIRMWARE = 0x00DE43, // firmware description / name
UF2_TAG_BUILD_DATE = 0x822F30, // build date/time as Unix timestamp
UF2_TAG_LT_VERSION = 0x59563D, // LT version (semver)
UF2_TAG_PART_1 = 0x805946, // OTA1 partition name
UF2_TAG_PART_2 = 0xA1E4D7, // OTA2 partition name
UF2_TAG_HAS_OTA1 = 0xBBD965, // image has any data for OTA1
UF2_TAG_HAS_OTA2 = 0x92280E, // image has any data for OTA2
UF2_TAG_BINPATCH = 0xB948DE, // binary patch to convert OTA1->OTA2
} uf2_tag_type_t;
typedef enum {
UF2_OPC_DIFF32 = 0xFE,
} uf2_opcode_t;
typedef enum {
UF2_ERR_OK = 0,
UF2_ERR_IGNORE, // block should be ignored
UF2_ERR_MAGIC, // wrong magic numbers
UF2_ERR_FAMILY, // family ID mismatched
UF2_ERR_NOT_HEADER, // block is not a header
UF2_ERR_OTA_VER, // unknown/invalid OTA format version
UF2_ERR_OTA_WRONG, // no data for current OTA index
UF2_ERR_PART_404, // no partition with that name
UF2_ERR_PART_ONE, // only one partition tag in a block
UF2_ERR_PART_UNSET, // image broken - attempted to write without target partition
UF2_ERR_DATA_TOO_LONG, // data too long - tags won't fit
UF2_ERR_SEQ_MISMATCH, // sequence number mismatched
UF2_ERR_ERASE_FAILED, // erasing flash failed
UF2_ERR_WRITE_FAILED, // writing to flash failed
UF2_ERR_WRITE_LENGTH, // wrote fewer data than requested
} uf2_err_t;

View File

@@ -76,6 +76,19 @@ env.AddLibrary(
],
)
# Sources - uf2ota library
ltchiptool_dir = platform.get_package_dir(f"tool-ltchiptool")
env.AddLibrary(
name="uf2ota",
base_dir=ltchiptool_dir,
srcs=[
"+<uf2ota/*.c>",
],
includes=[
"+<.>",
],
)
# Sources - board variant
env.AddLibrary(
name="board_${VARIANT}",

View File

@@ -590,7 +590,7 @@ env.BuildLibraries()
# Main firmware outputs and actions
env.Replace(
# linker command (encryption + packaging)
LINK="${LINK2BIN} ${VARIANT} '' ''",
LINK="${LTCHIPTOOL} link2bin ${VARIANT} '' ''",
# UF2OTA input list
UF2OTA=[
# app binary image (enc+crc), OTA1 (uploader) only

View File

@@ -294,7 +294,7 @@ env.BuildLibraries()
# Main firmware outputs and actions
env.Replace(
# linker command (dual .bin outputs)
LINK="${LINK2BIN} ${VARIANT} xip1 xip2",
LINK="${LTCHIPTOOL} link2bin ${VARIANT} xip1 xip2",
# default output .bin name
IMG_FW="image_${FLASH_OTA1_OFFSET}.ota1.bin",
# UF2OTA input list

View File

@@ -2,16 +2,15 @@
from os.path import join
from ltchiptool import Family
from SCons.Script import DefaultEnvironment
from tools.util.platform import get_family
env = DefaultEnvironment()
def env_add_defaults(env, platform, board):
# Get Family object for this board
family = get_family(short_name=board.get("build.family"))
family = Family.get(short_name=board.get("build.family"))
# Default environment variables
vars = dict(
SDK_DIR=platform.get_package_dir(family.framework),
@@ -36,10 +35,8 @@ def env_add_defaults(env, platform, board):
VARIANT=board.get("build.variant"),
LDSCRIPT_SDK=board.get("build.ldscript_sdk"),
LDSCRIPT_ARDUINO=board.get("build.ldscript_arduino"),
# Link2Bin tool
LINK2BIN='"${PYTHONEXE}" "${LT_DIR}/tools/link2bin.py"',
UF2OTA_PY='"${PYTHONEXE}" "${LT_DIR}/tools/uf2ota/uf2ota.py"',
UF2UPLOAD_PY='"${PYTHONEXE}" "${LT_DIR}/tools/upload/uf2upload.py"',
# ltchiptool variables
LTCHIPTOOL='"${PYTHONEXE}" -m ltchiptool',
# Fix for link2bin to get tmpfile name in argv
LINKCOM="${LINK} ${LINKARGS}",
LINKARGS="${TEMPFILE('-o $TARGET $LINKFLAGS $__RPATH $SOURCES $_LIBDIRFLAGS $_LIBFLAGS', '$LINKCOMSTR')}",

View File

@@ -35,14 +35,13 @@ def env_uf2ota(env, *args, **kwargs):
env["UF2OUT_BASE"] = basename(output)
cmd = [
"@${UF2OTA_PY}",
"@${LTCHIPTOOL} uf2 write",
f'--output "{output}"',
"--family ${FAMILY}",
"--board ${VARIANT}",
f"--version {lt_version}",
f'--fw "{project_name}:{project_version}"',
f"--date {int(now.timestamp())}",
"write",
inputs,
]
@@ -76,7 +75,10 @@ def env_uf2upload(env, target):
return
# add main upload target
env.Replace(UPLOADER="${UF2UPLOAD_PY}", UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}")
env.Replace(
UPLOADER="${LTCHIPTOOL} uf2 upload",
UPLOADCMD="${UPLOADER} ${UPLOADERFLAGS}",
)
actions.append(env.VerboseAction("${UPLOADCMD}", "Uploading ${UF2OUT_BASE}"))
env.AddPlatformTarget("upload", target, actions, "Upload")

View File

@@ -3,8 +3,6 @@
from os.path import join
from typing import List
from tools.util.fileio import writetext
class Markdown:
items: List[str]
@@ -15,7 +13,9 @@ class Markdown:
self.output = join(dir, f"{name}.md")
def write(self):
writetext(self.output, self.items)
with open(self.output, "w", encoding="utf-8") as f:
f.write("\n".join(self.items))
f.write("\n")
def pad(self, s: str, i: int) -> str:
return s + " " * (i - len(s))

View File

@@ -4,29 +4,19 @@ import sys
from os.path import dirname, join
sys.path.append(join(dirname(__file__), ".."))
import re
from typing import Dict, List, Set, Tuple
from typing import Dict, List, Set
import colorama
from colorama import Fore, Style
from tools.util.fileio import readjson, readtext
from tools.util.markdown import Markdown
from tools.util.obj import get, sizeof
from tools.util.platform import (
get_board_list,
get_board_manifest,
get_families,
get_family,
)
from ltchiptool import Board, Family
from ltchiptool.util import readjson, readtext, sizeof
from markdown import Markdown
OUTPUT = join(dirname(__file__), "status")
def load_boards() -> Dict[str, dict]:
return {board: get_board_manifest(board) for board in get_board_list()}
def load_chip_type_h() -> str:
code = readtext(
join(
@@ -43,24 +33,15 @@ def load_chip_type_h() -> str:
return code
def check_mcus(boards: List[Tuple[str, dict]]) -> bool:
for board_name, board in boards:
def check_mcus(boards: List[Board]) -> bool:
for board in boards:
# check if all boards' MCUs are defined in families.json
family_name: str = get(board, "build.family")
mcu_name: str = get(board, "build.mcu")
family = get_family(short_name=family_name)
if not family:
print(
Fore.RED
+ f"ERROR: Family '{family_name}' of board '{board_name}' does not exist"
+ Style.RESET_ALL
)
return False
mcus = [mcu.lower() for mcu in family.mcus]
mcu_name: str = board["build.mcu"]
mcus = [mcu.lower() for mcu in board.family.mcus]
if mcu_name not in mcus:
print(
Fore.RED
+ f"ERROR: MCU '{mcu_name}' of board '{board_name}' is not defined for family '{family_name}'"
+ f"ERROR: MCU '{mcu_name}' of board '{board.name}' is not defined for family '{board.family.name}'"
+ Style.RESET_ALL
)
return False
@@ -69,19 +50,19 @@ def check_mcus(boards: List[Tuple[str, dict]]) -> bool:
def get_family_mcus() -> Set[str]:
out = []
for family in get_families():
for family in Family.get_all():
out += family.mcus
return set(out)
def get_family_names() -> Set[str]:
return set(family.short_name for family in get_families())
return set(family.short_name for family in Family.get_all())
def get_board_mcus(boards: List[Tuple[str, dict]]) -> Set[str]:
def get_board_mcus(boards: List[Board]) -> Set[str]:
out = set()
for _, board in boards:
mcu_name: str = get(board, "build.mcu")
for board in boards:
mcu_name: str = board["build.mcu"]
out.add(mcu_name.upper())
return out
@@ -103,28 +84,27 @@ def get_enum_families(code: str) -> Set[str]:
return set(family[2:] for family in get_enum_keys(code, "ChipFamily"))
def board_sort(tpl):
generic = tpl[0].lower().startswith("generic")
vendor = get(tpl[1], "vendor")
def board_json_sort(tpl):
return tpl[1]["mcu"], tpl[0]
def board_obj_sort(board: Board):
generic = board.is_generic
vendor = board.vendor
if vendor == "N/A":
vendor = "\xff"
generic = False
return (
not generic, # reverse
vendor,
get(tpl[1], "build.mcu"),
get(tpl[1], "mcu"),
tpl[0],
board["build.mcu"],
board["mcu"],
board.name,
)
def get_board_symbol(board_name: str, board: dict) -> str:
symbol = get(board, "symbol")
if not symbol and board_name.startswith("generic-"):
symbol = board_name[8:]
else:
symbol = symbol or board_name.upper()
return symbol
def get_board_symbol(board: Board) -> str:
return board.symbol or board.generic_name or board.name.upper()
def write_chips(mcus: List[str]):
@@ -133,7 +113,7 @@ def write_chips(mcus: List[str]):
md.write()
def write_boards(boards: List[Tuple[str, dict]]):
def write_boards(boards: List[Board]):
md = Markdown(OUTPUT, "supported_boards")
header = [
"Name",
@@ -149,34 +129,33 @@ def write_boards(boards: List[Tuple[str, dict]]):
rows = []
vendor_prev = ""
for board_name, board in boards:
family = get_family(short_name=get(board, "build.family"))
for board in boards:
# add board vendor as a row
vendor = get(board, "vendor")
vendor = board["vendor"]
if vendor_prev != vendor:
rows.append([f"**{vendor}**"])
vendor_prev = vendor
# count total pin count & IO count
pins = "-"
pinout: Dict[str, dict] = get(board, "pcb.pinout")
pinout: Dict[str, dict] = board["pcb.pinout"]
if pinout:
pinout = [pin for name, pin in pinout.items() if name.isnumeric()]
pins_total = len(pinout)
pins_io = sum(1 for pin in pinout if "ARD" in pin)
pins = f"{pins_total} ({pins_io} I/O)"
# format row values
symbol = get_board_symbol(board_name, board)
board_url = f"[{symbol}](../../boards/{board_name}/README.md)"
symbol = get_board_symbol(board)
board_url = f"[{symbol}](../../boards/{board.name}/README.md)"
row = [
board_url,
get(board, "build.mcu").upper(),
sizeof(get(board, "upload.flash_size")),
sizeof(get(board, "upload.maximum_ram_size")),
board["build.mcu"].upper(),
sizeof(board["upload.flash_size"]),
sizeof(board["upload.maximum_ram_size"]),
pins,
"✔️" if "wifi" in get(board, "connectivity") else "",
"✔️" if "ble" in get(board, "connectivity") else "",
"✔️" if "zigbee" in get(board, "connectivity") else "",
f"`{family.name}`",
"✔️" if "wifi" in board["connectivity"] else "",
"✔️" if "ble" in board["connectivity"] else "",
"✔️" if "zigbee" in board["connectivity"] else "",
f"`{board.family.name}`",
]
rows.append(row)
md.add_table(header, *rows)
@@ -204,7 +183,7 @@ def write_unsupported_boards(
series_rows = []
series_rows.append([f"**{series_name.upper()} Series**"])
boards = series[series_name]
for board_name, board in sorted(boards.items(), key=board_sort):
for board_name, board in sorted(boards.items(), key=board_json_sort):
if board_name in supported:
continue
row = [
@@ -236,7 +215,7 @@ def write_families():
]
rows = []
for family in get_families():
for family in Family.get_all():
row = [
# Title
"[{}]({})".format(
@@ -274,14 +253,14 @@ def write_families():
md.write()
def write_boards_list(boards: List[Tuple[str, dict]]):
def write_boards_list(boards: List[Board]):
md = Markdown(dirname(__file__), join("..", "boards", "SUMMARY"))
items = []
for board_name, board in boards:
symbol = get_board_symbol(board_name, board)
if board_name.startswith("generic-"):
symbol = get(board, "name")
items.append(f"[{symbol}](../boards/{board_name}/README.md)")
for board in boards:
symbol = get_board_symbol(board)
if board.is_generic:
symbol = board["name"]
items.append(f"[{symbol}](../boards/{board.name}/README.md)")
md.add_list(*items)
md.write()
@@ -289,18 +268,17 @@ def write_boards_list(boards: List[Tuple[str, dict]]):
if __name__ == "__main__":
colorama.init()
boards = load_boards()
boards = sorted(boards.items(), key=board_sort)
boards = map(Board, Board.get_list())
boards = sorted(boards, key=board_obj_sort)
code = load_chip_type_h()
errors = False
for name, board in boards:
variant = get(board, "build.variant")
if name != variant:
for board in boards:
if board.name != board["source"]:
print(
Fore.RED
+ f"ERROR: Invalid build.variant of '{name}': '{variant}'"
+ f"ERROR: Invalid build.variant of '{board['source']}': '{board.name}'"
+ Style.RESET_ALL
)
errors = True
@@ -347,5 +325,5 @@ if __name__ == "__main__":
write_unsupported_boards(
series=data,
name=f"unsupported_{name}",
supported=[tpl[0] for tpl in boards],
supported=[board.name for board in boards],
)

View File

@@ -120,6 +120,11 @@
"~1.100301.0"
]
},
"tool-ltchiptool": {
"type": "uploader",
"version": "https://github.com/libretuya/ltchiptool#v1.2.1",
"note": "This is used only for C/C++ code from ltchiptool."
},
"tool-openocd": {
"type": "uploader",
"optional": true,

View File

@@ -1,26 +1,48 @@
# Copyright (c) Kuba Szczodrzyński 2022-04-20.
import importlib
import json
import sys
from os import system
from os.path import dirname, join
from typing import Dict
from platformio import util
from platformio.debug.config.base import DebugConfigBase
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.managers.platform import PlatformBase
from platformio.package.exception import MissingPackageManifestError
from platformio.package.manager.base import BasePackageManager
from platformio.package.meta import PackageItem, PackageSpec
from platformio.platform.base import PlatformBase
from platformio.platform.board import PlatformBoardConfig
from semantic_version import Version
# Make tools available
sys.path.insert(0, dirname(__file__))
from tools.util.platform import get_board_manifest
# Install & import tools
def check_ltchiptool():
global ltchiptool
import ltchiptool
importlib.reload(ltchiptool)
if Version(ltchiptool.get_version()) < Version("1.3.1"):
raise ImportError("Version too old")
try:
check_ltchiptool()
except (ImportError, AttributeError):
print("Installing/updating ltchiptool")
system(" ".join([sys.executable, "-m", "pip install -U ltchiptool"]))
try:
check_ltchiptool()
except (ImportError, AttributeError) as e:
print(
f"!!! Installing ltchiptool failed, or version outdated. Cannot continue: {e}"
)
raise e
# Remove current dir so it doesn't conflict with PIO
sys.path.remove(dirname(__file__))
if dirname(__file__) in sys.path:
sys.path.remove(dirname(__file__))
libretuya_packages = None
manifest_default = {"version": "0.0.0", "description": "", "keywords": []}
@@ -189,7 +211,7 @@ class LibretuyaPlatform(PlatformBase):
def update_board(self, board: PlatformBoardConfig):
if "_base" in board:
board._manifest = get_board_manifest(board._manifest)
board._manifest = ltchiptool.Board.get_data(board._manifest)
# add "arduino" framework
has_arduino = any("arduino" in fw for fw in board.manifest["frameworks"])

View File

@@ -1,241 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-31.
import sys
from os.path import dirname, join
sys.path.append(join(dirname(__file__), ".."))
import shlex
from argparse import ArgumentParser
from enum import Enum
from os import stat, unlink
from os.path import basename, dirname, isfile, join
from shutil import copyfile
from subprocess import PIPE, Popen
from typing import IO, Dict, List, Tuple
from tools.util.fileio import chext, isnewer, readtext
from tools.util.models import Family
from tools.util.obj import get
from tools.util.platform import get_board_manifest, get_family
class SocType(Enum):
UNSET = ()
# (index, toolchain prefix, has dual-OTA, argument count)
AMBZ = (1, "arm-none-eabi-", True, 0)
BK72XX = (2, "arm-none-eabi-", False, 0)
def cmd(self, program: str, args: List[str] = []) -> IO[bytes]:
program = self.prefix + program
cmd = [program] + args
try:
process = Popen(cmd, stdout=PIPE)
except FileNotFoundError:
if isinstance(cmd, list):
cmd = " ".join(cmd)
print(f"Toolchain not found while running: '{cmd}'")
exit(1)
return process.stdout
@property
def prefix(self) -> str:
return self.value[1]
@property
def dual_ota(self) -> bool:
return self.value[2]
@property
def soc_argc(self) -> int:
return self.value[3]
def nm(self, input: str) -> Dict[str, int]:
out = {}
stdout = self.cmd("gcc-nm", [input])
for line in stdout.readlines():
line = line.decode().strip().split(" ")
if len(line) != 3:
continue
out[line[2]] = int(line[0], 16)
return out
def objcopy(
self,
input: str,
output: str,
sections: List[str] = [],
fmt: str = "binary",
) -> str:
# print graph element
print(f"| | |-- {basename(output)}")
if isnewer(input, output):
args = []
for section in sections:
args += ["-j", section]
args += ["-O", fmt]
args += [input, output]
self.cmd("objcopy", args).read()
return output
# _ _ _ _ _ _ _ _
# | | | | | (_) (_) | (_)
# | | | | |_ _| |_| |_ _ ___ ___
# | | | | __| | | | __| |/ _ \/ __|
# | |__| | |_| | | | |_| | __/\__ \
# \____/ \__|_|_|_|\__|_|\___||___/
def checkfile(path: str):
if not isfile(path) or stat(path).st_size == 0:
print(f"Generated file not found: {path}")
exit(1)
# ______ _ ______ _ ____ _____ _ _
# | ____| | | ____| | | | _ \_ _| \ | |
# | |__ | | | |__ | |_ ___ | |_) || | | \| |
# | __| | | | __| | __/ _ \ | _ < | | | . ` |
# | |____| |____| | | || (_) | | |_) || |_| |\ |
# |______|______|_| \__\___/ |____/_____|_| \_|
def elf2bin(
soc: SocType,
family: Family,
board: dict,
input: str,
ota_idx: int = 1,
args: List[str] = [],
) -> Tuple[int, str]:
checkfile(input)
func = None
if soc == SocType.AMBZ:
from tools.soc.link2bin_ambz import elf2bin_ambz
func = elf2bin_ambz
elif soc == SocType.BK72XX:
from tools.soc.link2bin_bk72xx import elf2bin_bk72xx
func = elf2bin_bk72xx
if func:
return func(soc, family, board, input, ota_idx, *args)
raise NotImplementedError(f"SoC ELF->BIN not implemented: {soc}")
# _ _ _
# | | (_) | |
# | | _ _ __ | | _____ _ __
# | | | | '_ \| |/ / _ \ '__|
# | |____| | | | | < __/ |
# |______|_|_| |_|_|\_\___|_|
def ldargs_parse(
args: List[str],
ld_ota1: str,
ld_ota2: str,
) -> List[Tuple[str, List[str]]]:
args1 = list(args)
args2 = list(args)
elf1 = elf2 = None
for i, arg in enumerate(args):
if ".elf" in arg:
if not ld_ota1:
# single-OTA chip, return the output name
return [(arg, args)]
# append OTA index in filename
args1[i] = elf1 = chext(arg, "ota1.elf")
args2[i] = elf2 = chext(arg, "ota2.elf")
if arg.endswith(".ld") and ld_ota1:
# use OTA2 linker script
args2[i] = arg.replace(ld_ota1, ld_ota2)
if not elf1 or not elf2:
print("Linker output .elf not found in arguments")
return None
return [(elf1, args1), (elf2, args2)]
def link2bin(
soc: SocType,
family: Family,
board: dict,
ld_args: List[str],
ld_ota1: str = None,
ld_ota2: str = None,
soc_args: List[str] = [],
) -> List[str]:
elfs = []
if soc.dual_ota:
# process linker arguments for dual-OTA chips
elfs = ldargs_parse(ld_args, ld_ota1, ld_ota2)
else:
# just get .elf output name for single-OTA chips
elfs = ldargs_parse(ld_args, None, None)
if not elfs:
return None
ota_idx = 1
for elf, ldargs in elfs:
# print graph element
print(f"|-- Image {ota_idx}: {basename(elf)}")
if isfile(elf):
unlink(elf)
soc.cmd(f"gcc", args=ldargs).read()
checkfile(elf)
# generate a set of binaries for the SoC
elf2bin(soc, family, board, elf, ota_idx, soc_args)
ota_idx += 1
if soc.dual_ota:
# copy OTA1 file as firmware.elf to make PIO understand it
elf, _ = ldargs_parse(ld_args, None, None)[0]
copyfile(elfs[0][0], elf)
if __name__ == "__main__":
parser = ArgumentParser(
prog="link2bin",
description="Link to BIN format",
prefix_chars="#",
)
parser.add_argument("board", type=str, help="Target board name")
parser.add_argument("ota1", type=str, help=".LD file OTA1 pattern")
parser.add_argument("ota2", type=str, help=".LD file OTA2 pattern")
parser.add_argument("args", type=str, nargs="*", help="SoC+linker arguments")
args = parser.parse_args()
try:
board = get_board_manifest(args.board)
except FileNotFoundError:
print(f"Board not found: {args.board}")
exit(1)
family = get_family(short_name=get(board, "build.family"))
soc_types = {soc.name.lower(): soc for soc in SocType}
soc = soc_types.get(family.code, soc_types.get(family.parent_code, None))
if not soc:
print(f"SoC type not found. Tried {family.code}, {family.parent_code}")
exit(1)
if not args.args:
print(f"Linker arguments must not be empty")
exit(1)
try:
while True:
i = next(i for i, a in enumerate(args.args) if a.startswith("@"))
arg = args.args.pop(i)
argv = readtext(arg[1:])
argv = shlex.split(argv)
args.args = args.args[0:i] + argv + args.args[i:]
except StopIteration:
pass
link2bin(
soc,
family,
board,
args.args[soc.soc_argc :],
args.ota1,
args.ota2,
args.args[: soc.soc_argc],
)

View File

@@ -1,69 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-14.
from os.path import basename
from typing import IO, Tuple
from tools.util.fileio import chname, isnewer, readbin
from tools.util.intbin import inttole32
from tools.util.models import Family
def elf2bin_ambz(
soc,
family: Family,
board: dict,
input: str,
ota_idx: int = 1,
) -> Tuple[int, str]:
def write_header(f: IO[bytes], start: int, end: int):
f.write(b"81958711")
f.write(inttole32(end - start))
f.write(inttole32(start))
f.write(b"\xff" * 16)
sections_ram = [
".ram_image2.entry",
".ram_image2.data",
".ram_image2.bss",
".ram_image2.skb.bss",
".ram_heap.data",
]
sections_xip = [".xip_image2.text"]
sections_rdp = [".ram_rdp.text"]
nmap = soc.nm(input)
ram_start = nmap["__ram_image2_text_start__"]
ram_end = nmap["__ram_image2_text_end__"]
xip_start = nmap["__flash_text_start__"] - 0x8000020
# build output name
output = chname(input, f"image_0x{xip_start:06X}.ota{ota_idx}.bin")
out_ram = chname(input, f"ota{ota_idx}.ram_2.r.bin")
out_xip = chname(input, f"ota{ota_idx}.xip_image2.bin")
out_rdp = chname(input, f"ota{ota_idx}.rdp.bin")
# print graph element
print(f"| |-- {basename(output)}")
# objcopy required images
ram = soc.objcopy(input, out_ram, sections_ram)
xip = soc.objcopy(input, out_xip, sections_xip)
soc.objcopy(input, out_rdp, sections_rdp)
# return if images are up to date
if not isnewer(ram, output) and not isnewer(xip, output):
return (xip_start, output)
# read and trim RAM image
ram = readbin(ram).rstrip(b"\x00")
# read XIP image
xip = readbin(xip)
# align images to 4 bytes
ram += b"\x00" * (((((len(ram) - 1) // 4) + 1) * 4) - len(ram))
xip += b"\x00" * (((((len(xip) - 1) // 4) + 1) * 4) - len(xip))
# write output file
with open(output, "wb") as f:
# write XIP header
write_header(f, 0, len(xip))
# write XIP image
f.write(xip)
# write RAM header
write_header(f, ram_start, ram_end)
# write RAM image
f.write(ram)
return (xip_start, output)

View File

@@ -1,95 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-14.
from datetime import datetime
from os import stat
from os.path import basename
from typing import Tuple
from tools.util.bkutil import RBL, BekenBinary, DataType
from tools.util.fileio import chext, chname, isnewer, writebin, writejson
from tools.util.models import Family
from tools.util.obj import get
def calc_offset(addr: int) -> int:
return int(addr + (addr // 32) * 2)
def elf2bin_bk72xx(
soc,
family: Family,
board: dict,
input: str,
ota_idx: int = 1,
) -> Tuple[int, str]:
mcu = get(board, "build.mcu")
coeffs = get(board, "build.bkcrypt_coeffs") or ("0" * 32)
rbl_size = get(board, "build.bkrbl_size_app")
version = datetime.now().strftime("%y.%m.%d")
nmap = soc.nm(input)
app_addr = nmap["_vector_start"]
app_offs = calc_offset(app_addr)
app_size = int(rbl_size, 16)
rbl_offs = app_offs
# build output name
output = chname(input, f"{mcu}_app_0x{app_offs:06X}.rbl")
fw_bin = chext(input, "bin")
# print graph element
print(f"| |-- {basename(output)}")
# objcopy ELF -> raw BIN
soc.objcopy(input, fw_bin)
# return if images are up to date
if not isnewer(fw_bin, output):
return (app_offs, output)
bk = BekenBinary(coeffs)
rbl = RBL(
name="app",
version=f"{version}-{mcu}",
container_size=app_size,
)
fw_size = stat(fw_bin).st_size
raw = open(fw_bin, "rb")
out = open(output, "wb")
# open encrypted+CRC binary output
out_crc = chname(input, f"{mcu}_app_0x{app_offs:06X}.crc")
print(f"| |-- {basename(out_crc)}")
crc = open(out_crc, "wb")
# get partial (type, bytes) data generator
package_gen = bk.package(raw, app_addr, fw_size, rbl, partial=True)
# write all BINARY blocks
for data_type, data in package_gen:
if data_type != DataType.BINARY:
break
out.write(data)
crc.write(data)
rbl_offs += len(data)
# skip PADDING_SIZE bytes for RBL header, write it to main output
if data_type == DataType.PADDING_SIZE:
out.write(b"\xff" * data)
rbl_offs += data
# open RBL header output
out_rblh = chname(input, f"{mcu}_app_0x{rbl_offs:06X}.rblh")
print(f"| |-- {basename(out_rblh)}")
rblh = open(out_rblh, "wb")
# write all RBL blocks
for data_type, data in package_gen:
if data_type != DataType.RBL:
break
out.write(data)
rblh.write(data)
# close all files
raw.close()
out.close()
crc.close()
rblh.close()

View File

@@ -1,59 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-23.
import sys
try:
from platformio.package.manager.tool import ToolPackageManager
manager = ToolPackageManager()
pkg = manager.get_package("tool-bk7231tools")
sys.path.append(pkg.path)
from bk7231tools.serial import BK7231Serial
except (ImportError, AttributeError):
print("You need PlatformIO and tool-bk7231tools package to run this program.")
exit(1)
from tools.upload.ctx import UploadContext
def upload_uart(
ctx: UploadContext,
port: str,
baud: int = None,
**kwargs,
) -> bool:
prefix = "| |--"
# connect to chip
bk = BK7231Serial(port=port, baudrate=baud or ctx.baudrate or 115200)
# collect continuous blocks of data
parts = ctx.collect(ota_idx=1)
# write blocks to flash
for offs, data in parts.items():
length = len(data.getvalue())
data.seek(0)
print(prefix, f"Writing {length} bytes to 0x{offs:06x}")
try:
bk.program_flash(
data,
length,
offs,
verbose=False,
crc_check=True,
dry_run=False,
really_erase=True,
)
except ValueError as e:
print(prefix, f"Writing failed: {e.args[0]}")
return False
# reboot the chip
bk.reboot_chip()
return True
def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool:
if protocol == "uart":
return upload_uart(ctx, **kwargs)
print(f"Unknown upload protocol - {protocol}")
return False

View File

@@ -1,67 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from io import BytesIO
from tools.upload.ctx import UploadContext
from tools.upload.rtltool import RTLXMD
from tools.util.intbin import letoint
def upload_uart(
ctx: UploadContext,
port: str,
baud: int = None,
**kwargs,
) -> bool:
prefix = "| |--"
rtl = RTLXMD(port=port)
print(prefix, f"Connecting to {port}...")
if not rtl.connect():
print(prefix, f"Failed to connect on port {port}")
return False
# read system data to get active OTA index
io = BytesIO()
if not rtl.ReadBlockFlash(io, offset=0x9000, size=256):
print(prefix, "Failed to read from 0x9000")
return False
# get as bytes
system = io.getvalue()
if len(system) != 256:
print(prefix, f"Length invalid while reading from 0x9000 - {len(system)}")
return False
# read OTA switch value
ota_switch = bin(letoint(system[4:8]))[2:]
# count 0-bits
ota_idx = 1 + (ota_switch.count("0") % 2)
# validate OTA2 address in system data
if ota_idx == 2:
ota2_addr = letoint(system[0:4]) & 0xFFFFFF
part_addr = ctx.get_offset("ota2", 0)
if ota2_addr != part_addr:
print(
prefix,
f"Invalid OTA2 address on chip - found {ota2_addr}, expected {part_addr}",
)
return False
print(prefix, f"Flashing image to OTA {ota_idx}...")
# collect continuous blocks of data
parts = ctx.collect(ota_idx=ota_idx)
# write blocks to flash
for offs, data in parts.items():
offs |= 0x8000000
length = len(data.getvalue())
data.seek(0)
print(prefix, f"Writing {length} bytes to 0x{offs:06x}")
if not rtl.WriteBlockFlash(data, offs, length):
print(prefix, f"Writing failed at 0x{offs:x}")
return False
return True
def upload(ctx: UploadContext, protocol: str, **kwargs) -> bool:
if protocol == "uart":
return upload_uart(ctx, **kwargs)
print(f"Unknown upload protocol - {protocol}")
return False

View File

@@ -1,2 +0,0 @@
*.uf2
*.bin

View File

@@ -1,100 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-28.
from io import BytesIO, FileIO
from os import makedirs
from os.path import join
from typing import Dict, Tuple
from models import Opcode, Tag
from uf2 import UF2
from tools.util.intbin import inttole32, letoint, letosint
fs: Dict[str, Tuple[int, FileIO]] = {}
output_dir = ""
output_basename = ""
part1 = ""
part2 = ""
def write(part: str, offs: int, data: bytes):
global fs
if part not in fs or fs[part][0] != offs:
path = join(output_dir, output_basename + part + f"_0x{offs:x}.bin")
f = open(path, "wb")
if part in fs:
fs[part][1].close()
else:
f = fs[part][1]
fs[part] = (offs + f.write(data), f)
def update_parts(tags: Dict[Tag, bytes]):
global part1, part2
if Tag.LT_PART_1 in tags:
part1 = tags[Tag.LT_PART_1].decode()
part1 = ("1_" + part1) if part1 else None
if Tag.LT_PART_2 in tags:
part2 = tags[Tag.LT_PART_2].decode()
part2 = ("2_" + part2) if part2 else None
def uf2_dump(uf2: UF2, outdir: str):
global output_dir, output_basename
makedirs(outdir, exist_ok=True)
if Tag.LT_VERSION not in uf2.tags:
raise RuntimeError("Can only dump LibreTuya firmware images")
output_dir = outdir
output_basename = "_".join(
filter(
None,
[
uf2.tags.get(Tag.FIRMWARE, b"").decode(),
uf2.tags.get(Tag.VERSION, b"").decode(),
"lt" + uf2.tags[Tag.LT_VERSION].decode(),
uf2.tags.get(Tag.BOARD, b"").decode(),
],
)
)
output_basename += "_"
update_parts(uf2.tags)
for block in uf2.data:
# update target partition info
update_parts(block.tags)
# skip empty blocks
if not block.length:
continue
data1 = block.data if part1 else None
data2 = block.data if part2 else None
if Tag.LT_BINPATCH in block.tags:
# type 5, 6
data2 = bytearray(data2)
tag = block.tags[Tag.LT_BINPATCH]
binpatch = BytesIO(tag)
while binpatch.tell() < len(tag):
opcode = Opcode(binpatch.read(1)[0])
length = binpatch.read(1)[0]
data = binpatch.read(length)
if opcode == Opcode.DIFF32:
value = letosint(data[0:4])
for offs in data[4:]:
chunk = data2[offs : offs + 4]
chunk = letoint(chunk)
chunk += value
chunk = inttole32(chunk)
data2[offs : offs + 4] = chunk
data2 = bytes(data2)
if data1:
# types 1, 3, 4
write(part1, block.address, data1)
if data2:
# types 2, 3, 4
write(part2, block.address, data2)

View File

@@ -1,137 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from enum import IntEnum
class Tag(IntEnum):
VERSION = 0x9FC7BC # version of firmware file - UTF8 semver string
PAGE_SIZE = 0x0BE9F7 # page size of target device (32 bit unsigned number)
SHA2 = 0xB46DB0 # SHA-2 checksum of firmware (can be of various size)
DEVICE = 0x650D9D # description of device (UTF8)
DEVICE_ID = 0xC8A729 # device type identifier
# LibreTuya custom tags
OTA_VERSION = 0x5D57D0 # format version
BOARD = 0xCA25C8 # board name (lowercase code)
FIRMWARE = 0x00DE43 # firmware description / name
BUILD_DATE = 0x822F30 # build date/time as Unix timestamp
LT_VERSION = 0x59563D # LT version (semver)
LT_PART_1 = 0x805946 # OTA1 partition name
LT_PART_2 = 0xA1E4D7 # OTA2 partition name
LT_HAS_OTA1 = 0xBBD965 # image has any data for OTA1
LT_HAS_OTA2 = 0x92280E # image has any data for OTA2
LT_BINPATCH = 0xB948DE # binary patch to convert OTA1->OTA2
class Opcode(IntEnum):
DIFF32 = 0xFE # difference between 32-bit values
class Flags:
not_main_flash: bool = False
file_container: bool = False
has_family_id: bool = False
has_md5: bool = False
has_tags: bool = False
def encode(self) -> int:
val = 0
if self.not_main_flash:
val |= 0x00000001
if self.file_container:
val |= 0x00001000
if self.has_family_id:
val |= 0x00002000
if self.has_md5:
val |= 0x00004000
if self.has_tags:
val |= 0x00008000
return val
def decode(self, data: int):
self.not_main_flash = (data & 0x00000001) != 0
self.file_container = (data & 0x00001000) != 0
self.has_family_id = (data & 0x00002000) != 0
self.has_md5 = (data & 0x00004000) != 0
self.has_tags = (data & 0x00008000) != 0
def __str__(self) -> str:
flags = []
if self.not_main_flash:
flags.append("NMF")
if self.file_container:
flags.append("FC")
if self.has_family_id:
flags.append("FID")
if self.has_md5:
flags.append("MD5")
if self.has_tags:
flags.append("TAG")
return ",".join(flags)
class Input:
ota1_part: str = None
ota1_offs: int = 0
ota1_file: str = None
ota2_part: str = None
ota2_offs: int = 0
ota2_file: str = None
def __init__(self, input: str) -> None:
input = input.split(";")
n = len(input)
if n not in [2, 4]:
raise ValueError(
"Incorrect input format - should be part+offs;file[;part+offs;file]"
)
# just spread the same image twice for single-OTA scheme
if n == 2:
input += input
if input[0] and input[1]:
if "+" in input[0]:
(self.ota1_part, self.ota1_offs) = input[0].split("+")
self.ota1_offs = int(self.ota1_offs, 0)
else:
self.ota1_part = input[0]
self.ota1_file = input[1]
if input[2] and input[3]:
if "+" in input[2]:
(self.ota2_part, self.ota2_offs) = input[2].split("+")
self.ota2_offs = int(self.ota2_offs, 0)
else:
self.ota2_part = input[2]
self.ota2_file = input[3]
if self.ota1_file and self.ota2_file and self.ota1_offs != self.ota2_offs:
# currently, offsets cannot differ when storing images
# (this would require to actually store it twice)
raise ValueError(f"Offsets cannot differ ({self.ota1_file})")
@property
def is_single(self) -> bool:
return self.ota1_part == self.ota2_part and self.ota1_file == self.ota2_file
@property
def single_part(self) -> str:
return self.ota1_part or self.ota2_part
@property
def single_offs(self) -> int:
return self.ota1_offs or self.ota2_offs
@property
def single_file(self) -> str:
return self.ota1_file or self.ota2_file
@property
def has_ota1(self) -> bool:
return not not (self.ota1_part and self.ota1_file)
@property
def has_ota2(self) -> bool:
return not not (self.ota2_part and self.ota2_file)
@property
def is_simple(self) -> bool:
return self.ota1_file == self.ota2_file or not (self.has_ota1 and self.has_ota2)

View File

@@ -1,17 +0,0 @@
[tool.poetry]
name = "uf2ota"
version = "0.1.0"
description = "UF2 OTA update format"
authors = ["Kuba Szczodrzyński <kuba@szczodrzynski.pl>"]
license = "MIT"
[tool.poetry.dependencies]
python = "^3.7"
[tool.poetry.dev-dependencies]
black = "^22.3.0"
isort = "^5.10.1"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,152 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from io import BytesIO, FileIO
from typing import Dict, List
from models import Tag
from uf2_block import Block
from tools.util.intbin import align_down, align_up, intto8, inttole16, inttole32
from tools.util.models import Family
class UF2:
f: FileIO
seq: int = 0
family: Family = None
tags: Dict[Tag, bytes] = {}
data: List[Block] = []
def __init__(self, f: FileIO) -> None:
self.f = f
def store(
self,
address: int,
data: bytes,
tags: Dict[Tag, bytes] = {},
block_size: int = 256,
):
if len(data) <= block_size:
block = Block(self.family)
block.tags = tags
block.address = address
block.data = data
block.length = len(data)
self.data.append(block)
return
for offs in range(0, len(data), block_size):
block = Block(self.family)
block.tags = tags
data_part = data[offs : offs + block_size]
block.address = address + offs
block.data = data_part
block.length = len(data_part)
self.data.append(block)
tags = {}
def put_str(self, tag: Tag, value: str):
self.tags[tag] = value.encode("utf-8")
def put_int32le(self, tag: Tag, value: int):
self.tags[tag] = inttole32(value)
def put_int16le(self, tag: Tag, value: int):
self.tags[tag] = inttole16(value)
def put_int8(self, tag: Tag, value: int):
self.tags[tag] = intto8(value)
def read(self, block_tags: bool = True) -> bool:
while True:
data = self.f.read(512)
if len(data) not in [0, 512]:
print(f"Block size invalid ({len(data)})")
return False
if not len(data):
break
block = Block()
if not block.decode(data):
return False
if self.family and self.family != block.family:
print(f"Mismatched family ({self.family} != {block.family})")
return False
self.family = block.family
if block.block_seq != self.seq:
print(f"Mismatched sequence number ({self.seq} != {block.block_seq}")
return False
self.seq += 1
if block_tags or not block.length:
self.tags.update(block.tags)
if block.length and not block.flags.not_main_flash:
self.data.append(block)
return True
def dump(self):
print(f"Family: {self.family.short_name} / {self.family.description}")
print(f"Tags:")
for k, v in self.tags.items():
if "\\x" not in str(v):
v = v.decode()
else:
v = v.hex()
print(f" - {k.name}: {v}")
print(f"Data chunks: {len(self.data)}")
print(f"Total binary size: {sum(bl.length for bl in self.data)}")
@property
def block_count(self) -> int:
cnt = len(self.data)
if self.tags:
cnt += 1
return cnt
def write_header(self):
comment = "Hi! Please visit https://kuba2k2.github.io/libretuya/ to read specifications of this file format."
bl = Block(self.family)
bl.flags.has_tags = True
bl.flags.not_main_flash = True
bl.block_seq = 0
bl.block_count = self.block_count
bl.tags = self.tags
data = bl.encode()
# add comment in the unused space
tags_len = align_up(Block.get_tags_length(bl.tags), 16)
comment_len = len(comment)
if 476 - 16 >= tags_len + comment_len:
space = 476 - 16 - tags_len
start = (space - comment_len) / 2
start = align_down(start, 16)
padding1 = b"\x00" * start
padding2 = b"\x00" * (476 - tags_len - comment_len - start)
data = (
data[0 : 32 + tags_len]
+ padding1
+ comment.encode()
+ padding2
+ data[-4:]
)
self.f.write(data)
def write(self):
if self.tags and self.seq == 0:
self.write_header()
self.seq += 1
bio = BytesIO()
for bl in self.data:
bl.block_count = self.block_count
bl.block_seq = self.seq
bio.write(bl.encode())
if self.seq % 128 == 0:
# write the buffer every 64 KiB
self.f.write(bio.getvalue())
bio = BytesIO()
self.seq += 1
self.f.write(bio.getvalue())

View File

@@ -1,139 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from math import ceil
from typing import Dict
from models import Flags, Tag
from tools.util.intbin import align_up, intto8, inttole24, inttole32, letoint
from tools.util.models import Family
from tools.util.platform import get_family
class Block:
flags: Flags
address: int = 0
length: int = 0
block_seq: int = 0
block_count: int = 0
file_size: int = 0
family: Family
data: bytes = None
md5_data: bytes = None
tags: Dict[Tag, bytes] = {}
def __init__(self, family: Family = None) -> None:
self.flags = Flags()
self.family = family
self.flags.has_family_id = not not self.family
def encode(self) -> bytes:
self.flags.has_tags = not not self.tags
# UF2 magic 1 and 2
data = b"\x55\x46\x32\x0A\x57\x51\x5D\x9E"
# encode integer variables
data += inttole32(self.flags.encode())
data += inttole32(self.address)
data += inttole32(self.length)
data += inttole32(self.block_seq)
data += inttole32(self.block_count)
if self.flags.file_container:
data += inttole32(self.file_size)
elif self.flags.has_family_id:
data += inttole32(self.family.id)
else:
data += b"\x00\x00\x00\x00"
if not self.data:
self.data = b""
# append tags
tags = b""
if self.flags.has_tags:
for k, v in self.tags.items():
tag_size = 4 + len(v)
tags += intto8(tag_size)
tags += inttole24(k.value)
tags += v
tag_size %= 4
if tag_size:
tags += b"\x00" * (4 - tag_size)
# append block data with padding
data += self.data
data += tags
data += b"\x00" * (476 - len(self.data) - len(tags))
data += b"\x30\x6F\xB1\x0A" # magic 3
return data
def decode(self, data: bytes) -> bool:
# check block size
if len(data) != 512:
print(f"Invalid block size ({len(data)})")
return False
# check Magic 1
if letoint(data[0:4]) != 0x0A324655:
print(f"Invalid Magic 1 ({data[0:4]})")
return False
# check Magic 2
if letoint(data[4:8]) != 0x9E5D5157:
print(f"Invalid Magic 2 ({data[4:8]})")
return False
# check Magic 3
if letoint(data[508:512]) != 0x0AB16F30:
print(f"Invalid Magic 13({data[508:512]})")
return False
self.flags.decode(letoint(data[8:12]))
self.address = letoint(data[12:16])
self.length = letoint(data[16:20])
self.block_seq = letoint(data[20:24])
self.block_count = letoint(data[24:28])
if self.flags.file_container:
self.file_size = letoint(data[28:32])
if self.flags.has_family_id:
self.family = get_family(id=letoint(data[28:32]))
if self.flags.has_md5:
self.md5_data = data[484:508] # last 24 bytes of data[]
# decode tags
self.tags = {}
if self.flags.has_tags:
tags = data[32 + self.length :]
i = 0
while i < len(tags):
length = tags[i]
if not length:
break
tag_type = letoint(tags[i + 1 : i + 4])
tag_data = tags[i + 4 : i + length]
self.tags[Tag(tag_type)] = tag_data
i += length
i = int(ceil(i / 4) * 4)
self.data = data[32 : 32 + self.length]
return True
@staticmethod
def get_tags_length(tags: Dict[Tag, bytes]) -> int:
out = 0
# add tag headers
out += 4 * len(tags)
# add all tag lengths, padded to 4 bytes
out += sum(align_up(l, 4) for l in map(len, tags.values()))
# add final 0x00 tag
out += 4
return out
def __str__(self) -> str:
flags = self.flags
address = hex(self.address)
length = hex(self.length)
block_seq = self.block_seq
block_count = self.block_count
file_size = self.file_size
family = self.family.short_name
tags = [(k.name, v) for k, v in self.tags.items()]
return f"Block[{block_seq}/{block_count}](flags={flags}, address={address}, length={length}, file_size={file_size}, family={family}, tags={tags})"

View File

@@ -1,145 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
import sys
from os.path import dirname, join
sys.path.append(join(dirname(__file__), "..", ".."))
from argparse import ArgumentParser
from datetime import datetime
from zlib import crc32
from dump import uf2_dump
from models import Input, Tag
from uf2 import UF2
from uf2_block import Block
from utils import binpatch32
from tools.util.platform import get_family
BLOCK_SIZE = 256
def cli():
parser = ArgumentParser("uf2ota", description="UF2 OTA update format")
parser.add_argument("action", choices=["info", "dump", "write"])
parser.add_argument("inputs", nargs="+", type=str)
parser.add_argument("--output", help="Output .uf2 binary", type=str)
parser.add_argument("--family", help="Family name", type=str)
parser.add_argument("--board", help="Board name/code", type=str)
parser.add_argument("--version", help="LibreTuya core version", type=str)
parser.add_argument("--fw", help="Firmware name:version", type=str)
parser.add_argument("--date", help="Build date (Unix, default now)", type=int)
args = parser.parse_args()
if args.action == "info":
with open(args.inputs[0], "rb") as f:
uf2 = UF2(f)
if not uf2.read():
raise RuntimeError("Reading UF2 failed")
uf2.dump()
return
if args.action == "dump":
input = args.inputs[0]
outdir = input + "_dump"
with open(input, "rb") as f:
uf2 = UF2(f)
if not uf2.read(block_tags=False):
raise RuntimeError("Reading UF2 failed")
uf2_dump(uf2, outdir)
return
out = args.output or "out.uf2"
with open(out, "wb") as f:
uf2 = UF2(f)
uf2.family = get_family(args.family)
# store global tags (for entire file)
if args.board:
uf2.put_str(Tag.BOARD, args.board.lower())
key = f"LibreTuya {args.board.lower()}"
uf2.put_int32le(Tag.DEVICE_ID, crc32(key.encode()))
if args.version:
uf2.put_str(Tag.LT_VERSION, args.version)
if args.fw:
if ":" in args.fw:
(fw_name, fw_ver) = args.fw.split(":")
uf2.put_str(Tag.FIRMWARE, fw_name)
uf2.put_str(Tag.VERSION, fw_ver)
else:
uf2.put_str(Tag.FIRMWARE, args.fw)
uf2.put_int8(Tag.OTA_VERSION, 1)
uf2.put_str(Tag.DEVICE, "LibreTuya")
uf2.put_int32le(Tag.BUILD_DATE, args.date or int(datetime.now().timestamp()))
any_ota1 = False
any_ota2 = False
for input in args.inputs:
input = Input(input)
any_ota1 = any_ota1 or input.has_ota1
any_ota2 = any_ota2 or input.has_ota2
# store local tags (for this image only)
tags = {
Tag.LT_PART_1: input.ota1_part.encode() if input.has_ota1 else b"",
Tag.LT_PART_2: input.ota2_part.encode() if input.has_ota2 else b"",
}
if input.is_simple:
# single input image:
# - same image and partition (2 args)
# - same image but different partitions (4 args)
# - only OTA1 image
# - only OTA2 image
with open(input.single_file, "rb") as f:
data = f.read()
uf2.store(input.single_offs, data, tags, block_size=BLOCK_SIZE)
continue
# different images and partitions for both OTA schemes
with open(input.ota1_file, "rb") as f:
data1 = f.read()
with open(input.ota2_file, "rb") as f:
data2 = f.read()
if len(data1) != len(data2):
raise RuntimeError(
f"Images must have same lengths ({len(data1)} vs {len(data2)})"
)
for i in range(0, len(data1), 256):
block1 = data1[i : i + 256]
block2 = data2[i : i + 256]
if block1 == block2:
# blocks are identical, simply store them
uf2.store(
input.single_offs + i, block1, tags, block_size=BLOCK_SIZE
)
tags = {}
continue
# calculate max binpatch length (incl. existing tags and binpatch tag header)
max_length = 476 - BLOCK_SIZE - Block.get_tags_length(tags) - 4
# try 32-bit binpatch for best space optimization
binpatch = binpatch32(block1, block2, bladdr=i)
if len(binpatch) > max_length:
raise RuntimeError(
f"Binary patch too long - {len(binpatch)} > {max_length}"
)
tags[Tag.LT_BINPATCH] = binpatch
uf2.store(input.single_offs + i, block1, tags, block_size=BLOCK_SIZE)
tags = {}
uf2.put_int8(Tag.LT_HAS_OTA1, any_ota1 * 1)
uf2.put_int8(Tag.LT_HAS_OTA2, any_ota2 * 1)
uf2.write()
if __name__ == "__main__":
cli()

View File

@@ -1,72 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-05-27.
from typing import Dict, List, Tuple
from models import Opcode
from tools.util.intbin import intto8, letoint, sinttole32
def bindiff(
data1: bytes, data2: bytes, width: int = 1, single: bool = False
) -> Dict[int, Tuple[bytes, bytes]]:
out: Dict[int, Tuple[bytes, bytes]] = {}
offs = -1
diff1 = b""
diff2 = b""
for i in range(0, len(data1), width):
block1 = data1[i : i + width]
block2 = data2[i : i + width]
if block1 == block2:
# blocks are equal again
if offs != -1:
# store and reset current difference
out[offs] = (diff1, diff2)
offs = -1
diff1 = b""
diff2 = b""
continue
# blocks still differ
if single:
# single block per difference, so just store it
out[i] = (block1, block2)
else:
if offs == -1:
# difference starts here
offs = i
diff1 += block1
diff2 += block2
return out
def binpatch32(block1: bytes, block2: bytes, bladdr: int = 0) -> bytes:
# compare blocks:
# - in 4 byte (32 bit) chunks
# - report a single chunk in each difference
diffs = bindiff(block1, block2, width=4, single=True)
binpatch: Dict[int, List[int]] = {}
# gather all repeating differences (i.e. memory offsets for OTA1/OTA2)
for offs, diff in diffs.items():
(diff1, diff2) = diff
diff1 = letoint(diff1)
diff2 = letoint(diff2)
diff = diff2 - diff1
if diff in binpatch:
# difference already in this binpatch, add the offset
binpatch[diff].append(offs)
else:
# a new difference value
binpatch[diff] = [offs]
# print(f"Block at 0x{bladdr:x}+{offs:02x} -> {diff1:08x} - {diff2:08x} = {diff2-diff1:x}")
# print(f"Block at 0x{bladdr:x}: {len(binpatch)} difference(s) at {sum(len(v) for v in binpatch.values())} offsets")
# write binary patches
out = b""
for diff, offs in binpatch.items():
out += intto8(Opcode.DIFF32.value)
out += intto8(len(offs) + 4)
out += sinttole32(diff)
out += bytes(offs)
return out

View File

@@ -1,26 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from io import BytesIO
from tools.uf2ota.models import Opcode
from tools.util.intbin import inttole32, letoint, letosint
def binpatch_diff32(data: bytearray, patch: bytes) -> bytearray:
diff = letosint(patch[0:4])
for offs in patch[4:]:
value = letoint(data[offs : offs + 4])
value += diff
data[offs : offs + 4] = inttole32(value)
return data
def binpatch_apply(data: bytearray, binpatch: bytes) -> bytearray:
io = BytesIO(binpatch)
while io.tell() < len(binpatch):
opcode = io.read(1)[0]
length = io.read(1)[0]
bpdata = io.read(length)
if opcode == Opcode.DIFF32:
data = binpatch_diff32(data, bpdata)
return data

View File

@@ -1,164 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from datetime import datetime
from io import BytesIO
from typing import Dict, Tuple
from tools.uf2ota.models import Tag
from tools.uf2ota.uf2 import UF2
from tools.upload.binpatch import binpatch_apply
from tools.util.intbin import letoint
from tools.util.obj import get
from tools.util.platform import get_board_manifest
class UploadContext:
uf2: UF2
seq: int = 0
part1: str = None
part2: str = None
has_ota1: bool
has_ota2: bool
board_manifest: dict = None
def __init__(self, uf2: UF2) -> None:
self.uf2 = uf2
self.has_ota1 = uf2.tags.get(Tag.LT_HAS_OTA1, None) == b"\x01"
self.has_ota2 = uf2.tags.get(Tag.LT_HAS_OTA2, None) == b"\x01"
@property
def fw_name(self) -> str:
return self.uf2.tags.get(Tag.FIRMWARE, b"").decode()
@property
def fw_version(self) -> str:
return self.uf2.tags.get(Tag.VERSION, b"").decode()
@property
def lt_version(self) -> str:
return self.uf2.tags.get(Tag.LT_VERSION, b"").decode()
@property
def board(self) -> str:
return self.uf2.tags.get(Tag.BOARD, b"").decode()
@property
def build_date(self) -> datetime:
if Tag.BUILD_DATE not in self.uf2.tags:
return None
return datetime.fromtimestamp(letoint(self.uf2.tags[Tag.BUILD_DATE]))
@property
def baudrate(self) -> int:
if not self.board_manifest:
self.board_manifest = get_board_manifest(self.board)
return get(self.board_manifest, "upload.speed")
def get_offset(self, part: str, offs: int) -> int:
if not self.board_manifest:
self.board_manifest = get_board_manifest(self.board)
part = get(self.board_manifest, f"flash.{part}")
(offset, length) = map(lambda x: int(x, 16), part.split("+"))
if offs >= length:
return None
return offset + offs
def read(self, ota_idx: int = 1) -> Tuple[str, int, bytes]:
"""Read next available data block for the specified OTA scheme.
Returns:
Tuple[str, int, bytes]: target partition, relative offset, data block
"""
if ota_idx not in [1, 2]:
print(f"Invalid OTA index - {ota_idx}")
return None
if ota_idx == 1 and not self.has_ota1:
print(f"No data for OTA index - {ota_idx}")
return None
if ota_idx == 2 and not self.has_ota2:
print(f"No data for OTA index - {ota_idx}")
return None
for _ in range(self.seq, len(self.uf2.data)):
block = self.uf2.data[self.seq]
self.seq += 1
part1 = block.tags.get(Tag.LT_PART_1, None)
part2 = block.tags.get(Tag.LT_PART_2, None)
if part1 is not None and part2 is not None:
# decode empty tags too
self.part1 = part1.decode()
self.part2 = part2.decode()
elif part1 or part2:
print(f"Only one target partition specified - {part1} / {part2}")
return None
if not block.data:
continue
part = None
if ota_idx == 1:
part = self.part1
elif ota_idx == 2:
part = self.part2
if not part:
continue
# got data and target partition
offs = block.address
data = block.data
if ota_idx == 2 and Tag.LT_BINPATCH in block.tags:
binpatch = block.tags[Tag.LT_BINPATCH]
data = bytearray(data)
data = binpatch_apply(data, binpatch)
data = bytes(data)
return (part, offs, data)
return (None, 0, None)
def collect(self, ota_idx: int = 1) -> Dict[int, BytesIO]:
"""Read all UF2 blocks. Gather continuous data parts into sections
and their flashing offsets.
Returns:
Dict[int, BytesIO]: map of flash offsets to streams with data
"""
out: Dict[int, BytesIO] = {}
while True:
ret = self.read(ota_idx)
if not ret:
return False
(part, offs, data) = ret
if not data:
break
offs = self.get_offset(part, offs)
if offs is None:
return False
# find BytesIO in the dict
for io_offs, io_data in out.items():
if io_offs + len(io_data.getvalue()) == offs:
io_data.write(data)
offs = 0
break
if offs == 0:
continue
# create BytesIO at specified offset
io = BytesIO()
io.write(data)
out[offs] = io
# rewind BytesIO back to start
for io in out.values():
io.seek(0)
return out

View File

@@ -1,506 +0,0 @@
#!/usr/bin/env python
# RTL871xBx ROM Bootloader Utility Ver 12.01.2018
# Created on: 10.10.2017
# Author: pvvx
#
import argparse
import io
import os
import platform
import struct
import sys
import time
import serial
# Protocol bytes
SOH = b"\x01"
STX = b"\x02"
EOT = b"\x04"
ACK = b"\x06"
DLE = b"\x10"
NAK = b"\x15"
CAN = b"\x18"
CMD_USB = b"\x05" # UART Set Baud
CMD_XMD = b"\x07" # Go xmodem mode (write RAM/Flash mode)
CMD_EFS = b"\x17" # Erase Flash Sectors
CMD_RBF = b"\x19" # Read Block Flash
CMD_ABRT = b"\x1B" # End xmodem mode (write RAM/Flash mode)
CMD_GFS = b"\x21" # FLASH Get Status
CMD_SFS = b"\x26" # FLASH Set Status
# Protocol Mode
MODE_RTL = 0 # Rtl mode
MODE_XMD = 1 # xmodem mode
MODE_UNK1 = 3 # Unknown mode, test 1
MODE_UNK2 = 4 # Unknown mode, test 2
# Default baudrate
RTL_ROM_BAUD = 1500000
RTL_READ_BLOCK_SIZE = 1024
RTL_FLASH_SECTOR_SIZE = 4096
class RTLXMD:
def __init__(self, port=0, baud=RTL_ROM_BAUD, timeout=1):
self.mode = MODE_UNK1
try:
self._port = serial.Serial(port, baud)
self._port.timeout = timeout
except:
# raise Exception('Error open %s, %d baud' % (port, baud))
print("Error: Open %s, %d baud!" % (port, baud))
sys.exit(-1)
def writecmd(self, cmd, ok=ACK):
if self._port.write(cmd):
char = self._port.read(1)
if char:
if char == ok:
return True
return False
def WaitNAK(self):
chr_count = 128
while 1:
char = self._port.read(1)
if char:
if char == NAK:
return True
else:
return None
chr_count -= 1
if chr_count == 0:
return False
# return False
def sync(self, mode=MODE_RTL, flush=True, ready=7):
if flush:
self._port.flushOutput()
self._port.flushInput()
error_count = 0
cancel = 0
while True:
char = self._port.read(1)
if char:
if char == b"\x00":
continue
elif char == NAK:
# standard checksum requested (NAK)
if mode != self.mode:
if self.mode < MODE_UNK1:
if mode == MODE_RTL:
if self.writecmd(CMD_ABRT, CAN):
self.mode = MODE_RTL
# return True
break
elif mode == MODE_XMD:
if self.writecmd(CMD_XMD):
self.mode = MODE_XMD
break
else:
if mode == MODE_XMD:
if self.writecmd(CMD_XMD):
self.mode = MODE_XMD
break
self.mode = MODE_RTL
break
elif char == CAN:
# received CAN
if cancel:
# Transmission canceled: received 2xCAN at start-sequence
return False
else:
# Cancellation at start sequence
cancel = 1
# else:
# send error: expected NAK, or CAN
# print 'Not NAK or CAN: %02x' % (ord(char))
else:
if self.mode == MODE_UNK1:
if self.writecmd(CMD_XMD):
self.mode = MODE_XMD
if mode == MODE_XMD:
return True
if self.writecmd(CMD_ABRT, CAN):
self.mode = MODE_RTL
return True
self.mode = MODE_UNK2
elif self.mode == MODE_UNK2:
if self.writecmd(CMD_ABRT, CAN):
self.mode = MODE_RTL
if mode == MODE_RTL:
return True
if self.writecmd(CMD_XMD):
self.mode = MODE_XMD
return True
self.mode = MODE_UNK1
error_count += 1
if error_count > ready:
if self.mode == MODE_XMD:
# send error: error_count reached 15, aborting.
self._port.write(CAN)
self._port.write(CAN)
return False
return True
def ModeXmodem(self):
if self.sync():
ret = self.writecmd(CMD_XMD)
if ret == True:
self.mode = 1
return ret
return None
def RtlMode(self):
if self.sync():
ret = self.writecmd(CMD_ABRT, CAN)
if ret == True:
self.mode = 0
return ret
return None
def GetFlashStatus(self):
if self.sync():
self._port.write(CMD_GFS)
return self._port.read(1)
return None
def SetFlashStatus(self, status):
if self.sync():
if self.writecmd([CMD_SFS, status]):
return self.GetFlashStatus()
return None
def ReadBlockFlash(self, stream, offset=0, size=0x200000):
# Read sectors size: 4 block 1024 bytes, else not set ACK!
count = int((size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE)
offset &= 0xFFFFFF
if count > 0 and count < 0x10000 and offset >= 0: # 1 byte .. 16 Mbytes
ret = self.sync()
if ret:
ret = self._port.write(
struct.pack(
"<BHBH",
ord(CMD_RBF),
offset & 0xFFFF,
int(offset / 0x10000) & 0xFF,
count,
)
)
count *= 4
if ret:
for _ in range(count):
data = self._port.read(RTL_READ_BLOCK_SIZE)
if data:
ret = self._port.write(ACK)
if ret:
if size > RTL_READ_BLOCK_SIZE:
stream.write(data)
elif size > 0:
stream.write(data[:size])
else:
return ret
else:
return False
size -= RTL_READ_BLOCK_SIZE
if size <= 0:
ret = self.sync()
else:
ret = False
return ret
def connect(self):
# issue reset-to-bootloader:
# RTS = either RESET (both active low = chip in reset)
# DTR = GPIOA_30 (active low = boot to flasher)
self._port.setDTR(False)
self._port.setRTS(True)
time.sleep(0.05)
self._port.setDTR(True)
self._port.setRTS(False)
time.sleep(0.05)
self._port.setDTR(False)
return self.GetFlashStatus()
def EraseSectorsFlash(self, offset=0, size=0x200000):
count = int((size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE)
offset &= 0xFFF000
if count > 0 and count < 0x10000 and offset >= 0: # 1 byte .. 16 Mbytes
for i in range(count):
ret = self.sync()
if ret:
# print '\r%d' % i
ret = self.writecmd(
struct.pack(
"<BHBH",
ord(CMD_EFS),
offset & 0xFFFF,
int(offset / 0x10000) & 0xFF,
1,
)
)
if not ret:
return ret
offset += RTL_FLASH_SECTOR_SIZE
ret = self.sync()
else:
ret = False
return ret
def calc_checksum(self, data, checksum=0):
if platform.python_version_tuple() >= ("3", "0", "0"):
return (sum(data) + checksum) % 256
else:
return (sum(map(ord, data)) + checksum) % 256
def send_xmodem(self, stream, offset, size, retry=3):
ret = self.sync(MODE_XMD)
if ret:
sequence = 1
while size > 0:
if size <= 128:
packet_size = 128
cmd = SOH
else:
packet_size = 1024
cmd = STX
rdsize = packet_size
if size < rdsize:
rdsize = size
data = stream.read(rdsize)
if not data: # end of stream
print("send: at EOF")
return False
data = data.ljust(packet_size, b"\xFF")
pkt = (
struct.pack("<BBBI", ord(cmd), sequence, 0xFF - sequence, offset)
+ data
)
crc = self.calc_checksum(pkt[3:])
pkt += struct.pack("<B", crc)
error_count = 0
while True:
ret = self.writecmd(pkt)
if ret:
sequence = (sequence + 1) % 0x100
offset += packet_size
size -= rdsize
break
else:
error_count += 1
if error_count > retry:
return False
ret = self.writecmd(EOT) # if write SRAM -> (*0x10002000)()
self.mode = MODE_RTL
return ret
def WriteBlockSRAM(self, stream, offset=0x10002000, size=0x1000, retry=3):
offset &= 0x00FFFFFF
offset |= 0x10000000
return self.send_xmodem(stream, offset, size, retry)
def WriteBlockFlash(self, stream, offset=0x10010000, size=0x1000, retry=3):
offset &= 0x00FFFFFF
offset |= 0x08000000
return self.send_xmodem(stream, offset, size, retry)
def arg_auto_int(x):
return int(x, 0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="RT871xBx ROM Bootloader Utility", prog="rtltool"
)
parser.add_argument("--port", "-p", help="Serial port device", required=True)
parser.add_argument(
"--go", "-g", action="store_true", help="Run after performing the operation"
)
subparsers = parser.add_subparsers(
dest="operation",
help="Run rtltool {command} -h for additional help",
required=True,
)
parser_read_flash = subparsers.add_parser(
"rf", help="Read Flash data to binary file"
)
parser_read_flash.add_argument("address", help="Start address", type=arg_auto_int)
parser_read_flash.add_argument("size", help="Size of region", type=arg_auto_int)
parser_read_flash.add_argument("filename", help="Name of binary file")
parser_write_flash = subparsers.add_parser(
"wf", help="Write a binary file to Flash data"
)
parser_write_flash.add_argument("address", help="Start address", type=arg_auto_int)
parser_write_flash.add_argument("filename", help="Name of binary file")
parser_write_mem = subparsers.add_parser(
"wm", help="Write a binary file to SRAM memory"
)
parser_write_mem.add_argument("address", help="Start address", type=arg_auto_int)
# parser_write_mem.add_argument('size', help='Size of region', type=arg_auto_int)
parser_write_mem.add_argument("filename", help="Name of binary file")
parser_erase_flash = subparsers.add_parser("es", help="Erase Sectors Flash")
parser_erase_flash.add_argument("address", help="Start address", type=arg_auto_int)
parser_erase_flash.add_argument("size", help="Size of region", type=arg_auto_int)
parser_get_status_flash = subparsers.add_parser(
"gf", help="Get Flash Status register"
)
parser_set_status_flash = subparsers.add_parser(
"sf", help="Set Flash Status register"
)
parser_boot_flash = subparsers.add_parser("bf", help="Start boot flash")
parser_set_status_flash = subparsers.add_parser("gm", help="Go ROM Monitor")
args = parser.parse_args()
rtl = RTLXMD(args.port)
print("Connecting...")
if rtl.connect():
if args.operation == "wf":
stream = open(args.filename, "rb")
size = os.path.getsize(args.filename)
if size < 1:
stream.close
print("Error: File size = 0!")
sys.exit(-1)
offset = args.address & 0x00FFFFFF
offset |= 0x08000000
print(
"Write Flash data 0x%08x to 0x%08x from file: %s ..."
% (offset, offset + size, args.filename)
)
if not rtl.WriteBlockFlash(stream, args.address, size):
stream.close
print("Error: Write Flash!")
sys.exit(-2)
stream.close
# print 'Done!'
# sys.exit(0)
elif args.operation == "rf":
print(
"Read Flash data from 0x%08x to 0x%08x in file: %s ..."
% (args.address, args.address + args.size, args.filename)
)
stream = open(args.filename, "wb")
if not rtl.ReadBlockFlash(stream, args.address, args.size):
stream.close
print("Error!")
sys.exit(-2)
stream.close
# print 'Done!'
# sys.exit(0)
elif args.operation == "wm":
stream = open(args.filename, "rb")
size = os.path.getsize(args.filename)
if size < 1:
stream.close
print("Error: File size = 0!")
sys.exit(-1)
offset = args.address & 0x00FFFFFF
offset |= 0x10000000
print(
"Write SRAM at 0x%08x to 0x%08x from file: %s ..."
% (args.address, args.address + size, args.filename)
)
if not rtl.WriteBlockSRAM(stream, args.address, size):
stream.close
print("Error: Write Flash!")
sys.exit(-2)
stream.close
print("Done!")
sys.exit(0)
elif args.operation == "es":
count = (args.size + RTL_FLASH_SECTOR_SIZE - 1) / RTL_FLASH_SECTOR_SIZE
size = count * RTL_FLASH_SECTOR_SIZE
offset = args.address & 0xFFF000
print(
"Erase Flash %d sectors, data from 0x%08x to 0x%08x ..."
% (count, offset, offset + size)
)
if rtl.EraseSectorsFlash(offset, size):
print("Done!")
sys.exit(0)
print("Error: Erase Flash sectors!")
sys.exit(-2)
elif args.operation == "gf":
fsta = rtl.GetFlashStatus()
if fsta:
print("Flash Status value: 0x%02x" % (ord(fsta)))
sys.exit(0)
print("Error: Get Flash Status!")
sys.exit(-2)
elif args.operation == "sf":
print("Set Flash Status value: 0x%02x" % (args.value & 0xFF))
if rtl.SetFlashStatus(args.value & 0xFF):
sys.exit(0)
print("Error: Set Flash Status!")
sys.exit(-2)
elif args.operation == "bf":
print("BOOT_ROM_FromFlash()...") # ROM-Call:00005404
stream = io.BytesIO(b"\x05\x54\x00\x00")
if not rtl.WriteBlockSRAM(
stream, 0x10002000, 4
): # [0x10002000] = 0x00005405
stream.close
print("Error!")
sys.exit(-2)
print("Done!")
rtl._port.close()
rtl._port.baudrate = 115200
rtl._port.open()
rtl._port.timeout = 1
sio = io.TextIOWrapper(io.BufferedRWPair(rtl._port, rtl._port))
print(
sio.readline(),
sio.readline(),
sio.readline(),
sio.readline(),
sio.readline(),
)
sys.exit(0)
elif args.operation == "gm":
stream = io.BytesIO(
b"\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x19\x20\x00\x10\x00\x00\x00\x00\x08\xb5\x02\x4c\x4f\xf4\x7a\x70\xa0\x47\xfb\xe7\x05\x22\x00\x00"
)
if not rtl.WriteBlockSRAM(stream, 0x10002000, 40): # [0x10002000] = ...
stream.close
print("Error!")
sys.exit(-2)
print("Done!")
sys.exit(0)
else:
print("Failed to connect device on", args.port, "!")
sys.exit(-2)
if args.go:
if not rtl.WaitNAK() or rtl.writecmd(CMD_GFS, 0) == None:
print("Error: Sync!")
sys.exit(-2)
print("BOOT FromFlash...") # ROM-Call:00005404
stream = io.BytesIO(b"\x05\x54\x00\x00")
if not rtl.WriteBlockSRAM(stream, 0x10002000, 4): # [0x10002000] = 0x00005405
stream.close
print("Error!")
sys.exit(-2)
print("Done!")
sys.exit(0)

View File

@@ -1,58 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
import sys
from os.path import dirname, join
from time import time
sys.path.append(join(dirname(__file__), "..", ".."))
sys.path.append(join(dirname(__file__), "..", "uf2ota"))
from argparse import ArgumentParser, FileType
from tools.uf2ota.uf2 import UF2
from tools.upload.ctx import UploadContext
# TODO document this tool
if __name__ == "__main__":
parser = ArgumentParser("uf2upload", description="UF2 uploader")
parser.add_argument("file", type=FileType("rb"), help=".uf2 file")
subp = parser.add_subparsers(dest="protocol", help="Upload protocol", required=True)
parser_uart = subp.add_parser("uart", help="UART uploader")
parser_uart.add_argument("port", type=str, help="Serial port device")
parser_uart.add_argument("-b", "--baud", type=int, help="Serial baudrate")
args = parser.parse_args()
uf2 = UF2(args.file)
if not uf2.read(block_tags=False):
exit(1)
ctx = UploadContext(uf2)
print(
f"|-- {ctx.fw_name} {ctx.fw_version} @ {ctx.build_date} -> {ctx.board} via {args.protocol}"
)
start = time()
args = dict(args._get_kwargs())
if uf2.family.code == "ambz":
from tools.soc.uf2_rtltool import upload
if not upload(ctx, **args):
exit(1)
elif uf2.family.parent_code == "bk72xx":
from tools.soc.uf2_bk72xx import upload
if not upload(ctx, **args):
exit(1)
else:
print(f"Unsupported upload family - {uf2.family.name}")
exit(1)
duration = time() - start
print(f"|-- Finished in {duration:.3f} s")
exit(0)

View File

@@ -1,80 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-10.
from typing import List, Tuple, Union
from tools.util.intbin import uintmax
from tools.util.obj import SliceLike, slice2int
def bitcat(*vars: Tuple[Union["BitInt", int], SliceLike]) -> int:
"""Concat all 'vars' denoted in a (value, slice) format into a bitstring."""
out = 0
for val, sl in vars:
if not isinstance(val, BitInt):
val = BitInt(val)
(start, stop) = slice2int(sl)
out <<= start - stop + 1
out |= val[start:stop]
return out
def bitcatraw(*vars: Tuple[int, int]) -> int:
"""Concat all 'vars' denoted in a (value, bitwidth) format into a bitstring."""
out = 0
for val, bits in vars:
out <<= bits
out |= val
return out
class BitInt(int):
"""
Wrapper for int supporting slice reading and assignment of
individual bits (counting from LSB to MSB, like '7:0').
"""
value: int = None
def __init__(self, value: int) -> None:
self.value = value
def __getitem__(self, key):
if self.value is None:
self.value = self
# for best performance, slice2int() type checking was disabled
if isinstance(key, int):
return (self.value >> key) % 2
# (start, stop) = slice2int(key)
return (self.value >> key.stop) & uintmax(key.start - key.stop + 1)
def __setitem__(self, key, value):
if self.value is None:
self.value = self
(start, stop) = slice2int(key)
if value > uintmax(start - stop + 1):
raise ValueError("value is too big")
tmp = self.value & ~uintmax(start + 1)
tmp |= self.value & uintmax(stop)
tmp |= value << stop
self.value = tmp
def rep(self, n: int, sl: Union[SliceLike, List[SliceLike]]) -> int:
"""Construct a bitstring from 'sl' (being a single slice or a list)
repeated 'n' times."""
if isinstance(sl, list):
return self.cat(*(sl * n))
return self.cat(*([sl] * n))
def cat(self, *slices: SliceLike) -> int:
"""Construct a bitstring from this BitInt's parts denoted by 'slices'."""
out = 0
for sl in slices:
(start, stop) = slice2int(sl)
out <<= start - stop + 1
out |= self[start:stop]
return out
def __int__(self) -> int:
return self.value or self

View File

@@ -1,167 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-10.
from typing import List, Tuple
from tools.util.bitint import BitInt, bitcatraw
def pn15(addr: int) -> int:
# wire [15:0] pn_tmp = {addr[6:0], addr[15:7]} ^ {16'h6371 & {4{addr[8:5]}}};
a = ((addr % 0x80) * 0x200) + ((addr // 0x80) % 0x200)
b = (addr // 0x20) % 0x10
c = 0x6371 & (b * 0x1111)
return a ^ c
def pn16(addr: int) -> int:
# wire [16:0] pn_tmp = {addr[9:0], addr[16:10]} ^ {17'h13659 & {addr[4],{4{addr[1],addr[5],addr[9],addr[13]}}}};
a = ((addr % 0x400) * 0x80) + ((addr // 0x400) % 0x80)
b = (addr // 0x2000) % 2
b += ((addr // 0x200) % 2) * 2
b += ((addr // 0x20) % 2) * 4
b += ((addr // 0x2) % 2) * 8
c = (addr // 0x10) % 2
d = 0x13659 & (c * 0x10000 + b * 0x1111)
return a ^ d
def pn32(addr: int) -> int:
# wire [31:0] pn_tmp = {addr[14:0], addr[31:15]} ^ {32'hE519A4F1 & {8{addr[5:2]}}};
a = ((addr % 0x8000) * 0x20000) + ((addr // 0x8000) % 0x20000)
b = (addr // 0x4) % 0x10
c = 0xE519A4F1 & (b * 0x11111111)
return a ^ c
class BekenCrypto:
# translated from https://github.com/ghsecuritylab/tysdk_for_bk7231/blob/master/toolchain/encrypt_crc/abc.c
coef0: BitInt
coef1_mix: int
coef1_hi16: int
bypass: bool = False
pn15_args: List[slice] = None
pn16_args: slice = None
pn32_args: Tuple[int, int] = None
random: int = 0
def __init__(self, coeffs: List[BitInt]) -> None:
(self.coef0, coef1, coef2, coef3) = coeffs
# wire g_bypass = (coef3[31:24] == 8'hFF) | (coef3[31:24] == 8'h00);
self.bypass = coef3[31:24] in [0x00, 0xFF]
if self.bypass:
return
# wire pn16_bit = coef3[4];
# wire[16:0] pn16_addr = pn16_A ^ {coef1[15:8], pn16_bit, coef1[7:0]};
self.coef1_mix = bitcatraw((coef1[15:8], 8), (coef3[4], 1), (coef1[7:0], 8))
self.coef1_hi16 = coef1[31:16]
# wire pn15_bps = g_bypass | coef3[0];
pn15_bps = coef3[0]
# wire pn16_bps = g_bypass | coef3[1];
pn16_bps = coef3[1]
# wire pn32_bps = g_bypass | coef3[2];
pn32_bps = coef3[2]
# wire rand_bps = g_bypass | coef3[3];
rand_bps = coef3[3]
if coef3[3:0] == 0xF:
self.bypass = True
return
if not pn15_bps:
# wire[1:0] pn15_sel = coef3[ 6: 5];
pn15_sel = coef3[6:5]
# wire[15:0] pn15_A = (pn15_sel == 0) ? ({addr[31:24], addr[23:16]} ^ {addr[15:8], addr[ 7:0]}) :
# (pn15_sel == 1) ? ({addr[31:24], addr[23:16]} ^ {addr[ 7:0], addr[15:8]}) :
# (pn15_sel == 2) ? ({addr[23:16], addr[31:24]} ^ {addr[15:8], addr[ 7:0]}) :
# ({addr[23:16], addr[31:24]} ^ {addr[ 7:0], addr[15:8]});
if pn15_sel == 0:
self.pn15_args = [
slice(31, 24),
slice(23, 16),
slice(15, 8),
slice(7, 0),
]
elif pn15_sel == 1:
self.pn15_args = [
slice(31, 24),
slice(23, 16),
slice(7, 0),
slice(15, 8),
]
elif pn15_sel == 2:
self.pn15_args = [
slice(23, 16),
slice(31, 24),
slice(15, 8),
slice(7, 0),
]
else:
self.pn15_args = [
slice(23, 16),
slice(31, 24),
slice(7, 0),
slice(15, 8),
]
if not pn16_bps:
# wire[1:0] pn16_sel = coef3[ 9: 8];
pn16_sel = coef3[9:8]
# wire[16:0] pn16_A = (pn16_sel == 0) ? addr[16:0] :
# (pn16_sel == 1) ? addr[17:1] :
# (pn16_sel == 2) ? addr[18:2] :
# addr[19:3];
self.pn16_args = slice(16 + pn16_sel, pn16_sel)
if not pn32_bps:
# wire[1:0] pn32_sel = coef3[12:11];
pn32_sel = coef3[12:11]
# wire[31:0] pn32_A = (pn32_sel == 0) ? addr[31:0] :
# (pn32_sel == 1) ? {addr[ 7:0], addr[31: 8]} :
# (pn32_sel == 2) ? {addr[15:0], addr[31:16]} :
# {addr[23:0], addr[31:24]};
PN32_SHIFTS = (
(0, 0),
(2**8, 2**24),
(2**16, 2**16),
(2**24, 2**8),
)
self.pn32_args = PN32_SHIFTS[pn32_sel]
# wire[31:0] random = rand_bps ? 32'h00000000 : coef2[31:0];
self.random = 0 if rand_bps else coef2
def encrypt_u32(self, addr: int, data: int) -> int:
if self.bypass:
return data
addr = BitInt(addr)
pn15_v = 0
pn16_v = 0
pn32_v = 0
if self.pn15_args:
pn15_a = (addr[self.pn15_args[0]] * 0x100) + addr[self.pn15_args[1]]
pn15_b = (addr[self.pn15_args[2]] * 0x100) + addr[self.pn15_args[3]]
pn15_A = pn15_a ^ pn15_b
# wire[15:0] pn15_addr = pn15_A ^ coef1[31:16];
pn15_addr = pn15_A ^ self.coef1_hi16
pn15_v = pn15(pn15_addr)
if self.pn16_args:
pn16_A = addr[self.pn16_args]
# wire[16:0] pn16_addr = pn16_A ^ {coef1[15:8], pn16_bit, coef1[7:0]};
pn16_addr = pn16_A ^ self.coef1_mix
pn16_v = pn16(pn16_addr)
if self.pn32_args:
pn32_A = (addr // self.pn32_args[0]) + (addr * self.pn32_args[1])
# wire[31:0] pn32_addr = pn32_A ^ coef0[31:0];
pn32_addr = pn32_A ^ self.coef0
pn32_v = pn32(pn32_addr)
# assign pnout = pn32[31:0] ^ {pn15[15:0], pn16[15:0]} ^ random[31:0];
pnout = pn32_v ^ ((pn15_v * 0x10000) + (pn16_v % 0x10000)) ^ self.random
return data ^ pnout

View File

@@ -1,352 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-10.
import sys
from os.path import dirname, join
sys.path.append(join(dirname(__file__), "..", ".."))
from argparse import ArgumentParser, FileType
from binascii import crc32
from dataclasses import dataclass, field
from enum import Enum, IntFlag
from io import SEEK_SET, FileIO
from os import stat
from struct import Struct
from time import time
from typing import Generator, Tuple, Union
from tools.util.bitint import BitInt
from tools.util.bkcrypto import BekenCrypto
from tools.util.crc16 import CRC16
from tools.util.fileio import readbin, writebin
from tools.util.intbin import (
ByteGenerator,
align_up,
betoint,
biniter,
fileiter,
geniter,
inttobe16,
inttole32,
letoint,
pad_data,
pad_up,
)
class DataType(Enum):
BINARY = "BINARY"
PADDING_SIZE = "PADDING_SIZE"
RBL = "RBL"
DataTuple = Tuple[DataType, Union[bytes, int]]
DataUnion = Union[bytes, DataTuple]
DataGenerator = Generator[DataUnion, None, None]
class OTAAlgorithm(IntFlag):
NONE = 0
CRYPT_XOR = 1
CRYPT_AES256 = 2
COMPRESS_GZIP = 256
COMPRESS_QUICKLZ = 512
COMPRESS_FASTLZ = 768
@dataclass
class RBL:
ota_algo: OTAAlgorithm = OTAAlgorithm.NONE
timestamp: float = field(default_factory=time)
name: Union[str, bytes] = "app"
version: Union[str, bytes] = "1.00"
sn: Union[str, bytes] = "0" * 23
data_crc: int = 0
data_hash: int = 0x811C9DC5 # https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py
raw_size: int = 0
data_size: int = 0
container_size: int = 0
has_part_table: bool = False
@property
def container_size_crc(self) -> int:
return int(self.container_size + (self.container_size // 32) * 2)
def update(self, data: bytes):
self.data_crc = crc32(data, self.data_crc)
for byte in data:
if self.data_size < self.raw_size:
self.data_hash ^= byte
self.data_hash *= 0x01000193
self.data_hash %= 0x100000000
self.data_size += 1
def serialize(self) -> bytes:
if isinstance(self.name, str):
self.name = self.name.encode()
if isinstance(self.version, str):
self.version = self.version.encode()
if isinstance(self.sn, str):
self.sn = self.sn.encode()
# based on https://github.com/khalednassar/bk7231tools/blob/main/bk7231tools/analysis/rbl.py
struct = Struct("<4sII16s24s24sIIII") # without header CRC
rbl = struct.pack(
b"RBL\x00",
self.ota_algo,
int(self.timestamp),
pad_data(self.name, 16, 0x00),
pad_data(self.version, 24, 0x00),
pad_data(self.sn, 24, 0x00),
self.data_crc,
self.data_hash,
self.raw_size,
self.data_size,
)
return rbl + inttole32(crc32(rbl))
@classmethod
def deserialize(cls, data: bytes) -> "RBL":
crc_found = letoint(data[-4:])
data = data[:-4]
crc_expected = crc32(data)
if crc_expected != crc_found:
raise ValueError(
f"Invalid RBL CRC (expected {crc_expected:X}, found {crc_found:X})"
)
struct = Struct("<II16s24s24sIIII") # without magic and header CRC
rbl = cls(*struct.unpack(data[4:]))
rbl.ota_algo = OTAAlgorithm(rbl.ota_algo)
rbl.name = rbl.name.partition(b"\x00")[0].decode()
rbl.version = rbl.version.partition(b"\x00")[0].decode()
rbl.sn = rbl.sn.partition(b"\x00")[0].decode()
return rbl
class BekenBinary:
crypto: BekenCrypto
def __init__(self, coeffs: Union[bytes, str] = None) -> None:
if coeffs:
if isinstance(coeffs, str):
coeffs = bytes.fromhex(coeffs)
if len(coeffs) != 16:
raise ValueError(
f"Invalid length of encryption coefficients: {len(coeffs)}"
)
coeffs = list(map(BitInt, map(betoint, biniter(coeffs, 4))))
self.crypto = BekenCrypto(coeffs)
def crc(self, data: ByteGenerator, type: DataType = None) -> DataGenerator:
for block in geniter(data, 32):
crc = CRC16.CMS.calc(block)
block += inttobe16(crc)
if type:
yield (type, block)
else:
yield block
def uncrc(self, data: ByteGenerator, check: bool = True) -> ByteGenerator:
for block in geniter(data, 34):
if check:
crc = CRC16.CMS.calc(block[0:32])
crc_found = betoint(block[32:34])
if crc != crc_found:
print(f"CRC invalid: expected={crc:X}, found={crc_found:X}")
return
yield block[0:32]
def crypt(self, addr: int, data: ByteGenerator) -> ByteGenerator:
for word in geniter(data, 4):
word = letoint(word)
word = self.crypto.encrypt_u32(addr, word)
word = inttole32(word)
yield word
addr += 4
def package(
self,
f: FileIO,
addr: int,
size: int,
rbl: RBL,
partial: bool = False,
) -> DataGenerator:
if not rbl.container_size:
raise ValueError("RBL must have a total size when packaging")
crc_total = 0
# yield all data as (type, bytes) tuples, if partial mode enabled
type_binary = DataType.BINARY if partial else None
type_padding = DataType.PADDING_SIZE if partial else None
type_rbl = DataType.RBL if partial else None
# when to stop reading input data
data_end = size
if rbl.has_part_table:
data_end = size - 0xC0 # do not encrypt the partition table
# set RBL size including one 16-byte padding
rbl.raw_size = align_up(size + 16, 32) + 16
# encrypt the input file, padded to 32 bytes
data_crypt_gen = self.crypt(
addr, fileiter(f, size=32, padding=0xFF, count=data_end)
)
# iterate over encrypted 32-byte blocks
for block in geniter(data_crypt_gen, 32):
# add CRC16 and yield
yield from self.crc(block, type_binary)
crc_total += 2
rbl.update(block)
# temporary buffer for small-size operations
buf = b"\xff" * 16 # add 16 bytes of padding
if rbl.has_part_table:
# add an unencrypted partition table
buf += f.read(0xC0)
# update RBL
rbl.update(buf)
# add last padding with different values
rbl.update(b"\x10" * 16)
# add last padding with normal values
buf += b"\xff" * 16
# yield the temporary buffer
yield from self.crc(buf, type_binary)
crc_total += 2 * (len(buf) // 32)
# pad the entire container with 0xFF, excluding RBL and its CRC16
pad_size = pad_up(rbl.data_size + crc_total, rbl.container_size_crc) - 102
if type_padding:
yield (type_padding, pad_size)
else:
for _ in range(pad_size):
yield b"\xff"
# yield RBL with CRC16
yield from self.crc(rbl.serialize(), type_rbl)
def auto_int(x):
return int(x, 0)
def add_common_args(parser):
parser.add_argument(
"coeffs", type=str, help="Encryption coefficients (hex string, 32 chars)"
)
parser.add_argument("input", type=FileType("rb"), help="Input file")
parser.add_argument("output", type=FileType("wb"), help="Output file")
parser.add_argument("addr", type=auto_int, help="Memory address (dec/hex)")
if __name__ == "__main__":
parser = ArgumentParser(description="Encrypt/decrypt Beken firmware binaries")
sub = parser.add_subparsers(dest="action", required=True)
encrypt = sub.add_parser("encrypt", help="Encrypt binary files without packaging")
add_common_args(encrypt)
encrypt.add_argument("-c", "--crc", help="Include CRC16", action="store_true")
decrypt = sub.add_parser("decrypt", description="Decrypt unpackaged binary files")
add_common_args(decrypt)
decrypt.add_argument(
"-C",
"--no-crc-check",
help="Do not check CRC16 (if present)",
action="store_true",
)
package = sub.add_parser(
"package", description="Package raw binary files as RBL containers"
)
add_common_args(package)
package.add_argument(
"size", type=auto_int, help="RBL total size (excl. CRC) (dec/hex)"
)
package.add_argument(
"-n",
"--name",
type=str,
help="Firmware name (default: app)",
default="app",
required=False,
)
package.add_argument(
"-v",
"--version",
type=str,
help="Firmware version (default: 1.00)",
default="1.00",
required=False,
)
unpackage = sub.add_parser(
"unpackage", description="Unpackage a single RBL container"
)
add_common_args(unpackage)
unpackage.add_argument(
"offset", type=auto_int, help="Offset in input file (dec/hex)"
)
unpackage.add_argument(
"size", type=auto_int, help="Container total size (incl. CRC) (dec/hex)"
)
args = parser.parse_args()
bk = BekenBinary(args.coeffs)
f: FileIO = args.input
size = stat(args.input.name).st_size
start = time()
if args.action == "encrypt":
print(f"Encrypting '{f.name}' ({size} bytes)")
if args.crc:
print(f" - calculating 32-byte block CRC16...")
gen = bk.crc(bk.crypt(args.addr, f))
else:
print(f" - as raw binary, without CRC16...")
gen = bk.crypt(args.addr, f)
if args.action == "decrypt":
print(f"Decrypting '{f.name}' ({size} bytes)")
if size % 34 == 0:
if args.no_crc_check:
print(f" - has CRC16, skipping checks...")
else:
print(f" - has CRC16, checking...")
gen = bk.crypt(args.addr, bk.uncrc(f, check=not args.no_crc_check))
elif size % 4 != 0:
raise ValueError("Input file has invalid length")
else:
print(f" - raw binary, no CRC")
gen = bk.crypt(args.addr, f)
if args.action == "package":
print(f"Packaging {args.name} '{f.name}' for memory address 0x{args.addr:X}")
rbl = RBL(name=args.name, version=args.version)
if args.name == "bootloader":
rbl.has_part_table = True
print(f" - in bootloader mode; partition table unencrypted")
rbl.container_size = args.size
print(f" - container size (excl. CRC): 0x{rbl.container_size:X}")
print(f" - container size (incl. CRC): 0x{rbl.container_size_crc:X}")
gen = bk.package(f, args.addr, size, rbl)
if args.action == "unpackage":
print(f"Unpackaging '{f.name}' (at 0x{args.offset:X}, size 0x{args.size:X})")
f.seek(args.offset + args.size - 102, SEEK_SET)
rbl = f.read(102)
rbl = b"".join(bk.uncrc(rbl))
rbl = RBL.deserialize(rbl)
print(f" - found '{rbl.name}' ({rbl.version}), size {rbl.data_size}")
f.seek(0, SEEK_SET)
crc_size = (rbl.data_size - 16) // 32 * 34
gen = bk.crypt(args.addr, bk.uncrc(fileiter(f, 32, 0xFF, crc_size)))
written = 0
for data in gen:
args.output.write(data)
written += len(data)
print(f" - wrote {written} bytes in {time()-start:.3f} s")

View File

@@ -1,133 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from enum import Enum
from typing import List
class CRC16(Enum):
# based on https://crccalc.com/ and https://reveng.sourceforge.io/crc-catalogue/16.htm
ANSI = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000)
ARC = dict(poly=0x8005, init=0x0000, ref=True, out=0x0000)
AUG_CCITT = dict(poly=0x1021, init=0x1D0F, ref=False, out=0x0000)
AUTOSAR = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000)
BUYPASS = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000)
CCITT = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000)
CCITT_FALSE = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000)
CCITT_TRUE = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000)
CDMA2000 = dict(poly=0xC867, init=0xFFFF, ref=False, out=0x0000)
CMS = dict(poly=0x8005, init=0xFFFF, ref=False, out=0x0000)
CRC_A = dict(poly=0x1021, init=0xC6C6, ref=True, out=0x0000)
CRC_B = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF)
DARC = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF)
DDS_110 = dict(poly=0x8005, init=0x800D, ref=False, out=0x0000)
DECT_R = dict(poly=0x0589, init=0x0000, ref=False, out=0x0001)
DECT_X = dict(poly=0x0589, init=0x0000, ref=False, out=0x0000)
DNP = dict(poly=0x3D65, init=0x0000, ref=True, out=0xFFFF)
EN_13757 = dict(poly=0x3D65, init=0x0000, ref=False, out=0xFFFF)
EPC = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF)
EPC_C1G2 = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF)
GENIBUS = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF)
GSM = dict(poly=0x1021, init=0x0000, ref=False, out=0xFFFF)
I_CODE = dict(poly=0x1021, init=0xFFFF, ref=False, out=0xFFFF)
IBM = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000)
IBM_3740 = dict(poly=0x1021, init=0xFFFF, ref=False, out=0x0000)
IBM_SDLC = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF)
IEC_61158_2 = dict(poly=0x1DCF, init=0xFFFF, ref=False, out=0xFFFF)
ISO_14443_3_A = dict(poly=0x1021, init=0xC6C6, ref=True, out=0x0000)
ISO_14443_3_B = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF)
ISO_HDLC = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF)
KERMIT = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000)
LHA = dict(poly=0x8005, init=0x0000, ref=True, out=0x0000)
LJ1200 = dict(poly=0x6F63, init=0x0000, ref=False, out=0x0000)
M17 = dict(poly=0x5935, init=0xFFFF, ref=False, out=0x0000)
MAXIM = dict(poly=0x8005, init=0x0000, ref=True, out=0xFFFF)
MCRF4XX = dict(poly=0x1021, init=0xFFFF, ref=True, out=0x0000)
MODBUS = dict(poly=0x8005, init=0xFFFF, ref=True, out=0x0000)
NRSC_5 = dict(poly=0x080B, init=0xFFFF, ref=True, out=0x0000)
OPENSAFETY_A = dict(poly=0x5935, init=0x0000, ref=False, out=0x0000)
OPENSAFETY_B = dict(poly=0x755B, init=0x0000, ref=False, out=0x0000)
PROFIBUS = dict(poly=0x1DCF, init=0xFFFF, ref=False, out=0xFFFF)
RIELLO = dict(poly=0x1021, init=0xB2AA, ref=True, out=0x0000)
SPI_FUJITSU = dict(poly=0x1021, init=0x1D0F, ref=False, out=0x0000)
T10_DIF = dict(poly=0x8BB7, init=0x0000, ref=False, out=0x0000)
TELEDISK = dict(poly=0xA097, init=0x0000, ref=False, out=0x0000)
TMS37157 = dict(poly=0x1021, init=0x89EC, ref=True, out=0x0000)
UMTS = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000)
USB = dict(poly=0x8005, init=0xFFFF, ref=True, out=0xFFFF)
V_41_LSB = dict(poly=0x1021, init=0x0000, ref=True, out=0x0000)
VERIFONE = dict(poly=0x8005, init=0x0000, ref=False, out=0x0000)
X_25 = dict(poly=0x1021, init=0xFFFF, ref=True, out=0xFFFF)
XMODEM = dict(poly=0x1021, init=0x0000, ref=False, out=0x0000)
poly: int
init: int
ref: bool
out: int
table: List[int]
def __init__(self, params: dict) -> None:
super().__init__()
self.poly = params["poly"]
self.init = params["init"]
self.ref = params["ref"]
self.out = params["out"]
self.table = None
if self.ref:
self.poly = self.reverse16(self.poly)
self.init = self.reverse16(self.init)
@staticmethod
def reverse16(num: int) -> int:
out = 0
for i in range(16):
out |= ((num & (1 << i)) >> i) << (15 - i)
return out
def calc(self, data: bytes) -> int:
if self.ref:
self._init_ref()
return self._calc_ref(data)
self._init_std()
return self._calc_std(data)
def _init_std(self):
if self.table:
return
self.table = []
for b in range(256):
crc = b << 8
for _ in range(8):
if crc & 0x8000:
crc <<= 1
crc ^= self.poly
else:
crc <<= 1
self.table.append(crc & 0xFFFF)
def _init_ref(self):
if self.table:
return
self.table = []
for b in range(256):
crc = b
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= self.poly
else:
crc >>= 1
self.table.append(crc)
def _calc_std(self, data: bytes) -> int:
crc = self.init
for b in data:
b ^= crc // 256
crc = self.table[b] ^ (crc * 256 % 0x10000)
return crc ^ self.out
def _calc_ref(self, data: bytes) -> int:
crc = self.init
for b in data:
b ^= crc % 256
crc = self.table[b] ^ (crc // 256)
return crc ^ self.out

View File

@@ -1,78 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-10.
import json
from io import BytesIO
from os.path import dirname, getmtime, isfile, join
from typing import List, Union
def chname(path: str, name: str) -> str:
"""Change the basename of 'path' to 'name'."""
return join(dirname(path), name)
def chext(path: str, ext: str) -> str:
"""Change the file extension of 'path' to 'ext' (without the dot)."""
return path.rpartition(".")[0] + "." + ext
def isnewer(what: str, than: str) -> bool:
"""Check if 'what' is newer than 'than'.
Returns False if 'what' is not a file.
Returns True if 'than' is not a file.
"""
if not isfile(what):
return False
if not isfile(than):
return True
return getmtime(what) > getmtime(than)
def readbin(file: str) -> bytes:
"""Read a binary file into a bytes object."""
with open(file, "rb") as f:
data = f.read()
return data
def writebin(file: str, data: Union[bytes, BytesIO]):
"""Write data into a binary file."""
with open(file, "wb") as f:
if isinstance(data, BytesIO):
f.write(data.getvalue())
else:
f.write(data)
# same as load_json
def readjson(file: str) -> Union[dict, list]:
"""Read a JSON file into a dict or list."""
with open(file, "r", encoding="utf-8") as f:
return json.load(f)
def writejson(file: str, data: Union[dict, list]):
"""Write a dict or list to a JSON file."""
with open(file, "w", encoding="utf-8") as f:
json.dump(data, f)
def readtext(file: str) -> str:
"""Read a text file into a string."""
with open(file, "r", encoding="utf-8") as f:
data = f.read()
return data
def writetext(file: str, data: Union[str, bytes, List[str]]):
"""Write data into a text file."""
with open(file, "w", encoding="utf-8") as f:
if isinstance(data, bytes):
f.write(data.decode())
elif isinstance(data, list):
f.write("\n".join(data))
f.write("\n")
else:
f.write(data)

View File

@@ -1,204 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from io import FileIO
from typing import IO, Generator, Union
ByteGenerator = Generator[bytes, None, None]
def bswap(data: bytes) -> bytes:
"""Reverse the byte array (big-endian <-> little-endian)."""
return bytes(reversed(data))
def betoint(data: bytes) -> int:
"""Convert bytes to big-endian unsigned integer."""
return int.from_bytes(data, byteorder="big")
def letoint(data: bytes) -> int:
"""Convert bytes to little-endian unsigned integer."""
return int.from_bytes(data, byteorder="little")
def betosint(data: bytes) -> int:
"""Convert bytes to big-endian signed integer."""
return int.from_bytes(data, byteorder="big", signed=True)
def letosint(data: bytes) -> int:
"""Convert bytes to little-endian signed integer."""
return int.from_bytes(data, byteorder="little", signed=True)
def inttobe32(data: int) -> bytes:
"""Convert unsigned integer to 32 bits, big-endian."""
return data.to_bytes(length=4, byteorder="big")
def inttole32(data: int) -> bytes:
"""Convert unsigned integer to 32 bits, little-endian."""
return data.to_bytes(length=4, byteorder="little")
def inttobe24(data: int) -> bytes:
"""Convert unsigned integer to 24 bits, big-endian."""
return data.to_bytes(length=3, byteorder="big")
def inttole24(data: int) -> bytes:
"""Convert unsigned integer to 24 bits, little-endian."""
return data.to_bytes(length=3, byteorder="little")
def inttobe16(data: int) -> bytes:
"""Convert unsigned integer to 16 bits, big-endian."""
return data.to_bytes(length=2, byteorder="big")
def inttole16(data: int) -> bytes:
"""Convert unsigned integer to 16 bits, little-endian."""
return data.to_bytes(length=2, byteorder="little")
def intto8(data: int) -> bytes:
"""Convert unsigned integer to 8 bits."""
return data.to_bytes(length=1, byteorder="big")
def sinttobe32(data: int) -> bytes:
"""Convert signed integer to 32 bits, big-endian."""
return data.to_bytes(length=4, byteorder="big", signed=True)
def sinttole32(data: int) -> bytes:
"""Convert signed integer to 32 bits, little-endian."""
return data.to_bytes(length=4, byteorder="little", signed=True)
def sinttobe24(data: int) -> bytes:
"""Convert signed integer to 24 bits, big-endian."""
return data.to_bytes(length=3, byteorder="big", signed=True)
def sinttole24(data: int) -> bytes:
"""Convert signed integer to 24 bits, little-endian."""
return data.to_bytes(length=3, byteorder="little", signed=True)
def sinttobe16(data: int) -> bytes:
"""Convert signed integer to 16 bits, big-endian."""
return data.to_bytes(length=2, byteorder="big", signed=True)
def sinttole16(data: int) -> bytes:
"""Convert signed integer to 16 bits, little-endian."""
return data.to_bytes(length=2, byteorder="little", signed=True)
def sintto8(data: int) -> bytes:
"""Convert signed integer to 8 bits."""
return data.to_bytes(length=1, byteorder="little", signed=True)
def align_up(x: int, n: int) -> int:
"""Return x aligned up to block size of n."""
return int((x - 1) // n + 1) * n
def align_down(x: int, n: int) -> int:
"""Return 'x' aligned down to block size of 'n'."""
return int(x // n) * n
def pad_up(x: int, n: int) -> int:
"""Return how many bytes of padding is needed to align 'x'
up to block size of 'n'."""
return n - (x % n)
def pad_data(data: bytes, n: int, char: int) -> bytes:
"""Add 'char'-filled padding to 'data' to align to a 'n'-sized block."""
if len(data) % n == 0:
return data
return data + (bytes([char]) * pad_up(len(data), n))
def uint8(val):
"""Get only the least-significant 8 bits of the value."""
return val & 0xFF
def uint16(val):
"""Get only the least-significant 16 bits of the value."""
return val & 0xFFFF
def uint32(val):
"""Get only the least-significant 32 bits of the value."""
return val & 0xFFFFFFFF
def uintmax(bits: int) -> int:
"""Get maximum integer size for given bit width."""
return (2**bits) - 1
def biniter(data: bytes, size: int) -> ByteGenerator:
"""Iterate over 'data' in 'size'-bytes long chunks, returning
a generator."""
if len(data) % size != 0:
raise ValueError(
f"Data length must be a multiple of block size ({len(data)} % {size})"
)
for i in range(0, len(data), size):
yield data[i : i + size]
def geniter(gen: Union[ByteGenerator, bytes, IO], size: int) -> ByteGenerator:
"""
Take data from 'gen' and generate 'size'-bytes long chunks.
If 'gen' is a bytes or IO object, it is wrapped using
biniter() or fileiter().
"""
if isinstance(gen, bytes):
yield from biniter(gen, size)
return
if isinstance(gen, IO):
yield from fileiter(gen, size)
return
buf = b""
for part in gen:
if not buf and len(part) == size:
yield part
continue
buf += part
while len(buf) >= size:
yield buf[0:size]
buf = buf[size:]
def fileiter(
f: FileIO, size: int, padding: int = 0x00, count: int = 0
) -> ByteGenerator:
"""
Read data from 'f' and generate 'size'-bytes long chunks.
Pad incomplete chunks with 'padding' character.
Read up to 'count' bytes from 'f', if specified. Data is padded
if not on chunk boundary.
"""
read = 0
while True:
if count and read + size >= count:
yield pad_data(f.read(count % size), size, padding)
return
data = f.read(size)
read += len(data)
if len(data) < size:
# got only part of the block
yield pad_data(data, size, padding)
return
yield data

View File

@@ -1,60 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from os.path import dirname, isdir, join
from typing import List
class Family:
id: int
short_name: str
description: str
name: str = None
parent: str = None
code: str = None
parent_code: str = None
url: str = None
sdk: str = None
framework: str = None
mcus: List[str] = []
def __init__(self, data: dict):
for key, value in data.items():
if key == "id":
self.id = int(value, 16)
else:
setattr(self, key, value)
@property
def sdk_name(self) -> str:
return self.sdk.rpartition("/")[2] if self.sdk else None
@property
def has_arduino_core(self) -> bool:
if not self.name:
return False
if isdir(join(dirname(__file__), "..", "..", "arduino", self.name)):
return True
if not self.parent:
return False
if isdir(join(dirname(__file__), "..", "..", "arduino", self.parent)):
return True
return False
def dict(self) -> dict:
return dict(
FAMILY=self.short_name,
FAMILY_ID=self.id,
FAMILY_NAME=self.name,
FAMILY_PARENT=self.parent,
FAMILY_CODE=self.code,
FAMILY_PARENT_CODE=self.parent_code,
)
def __eq__(self, __o: object) -> bool:
return isinstance(__o, Family) and self.id == __o.id
def __iter__(self):
return iter(self.dict().items())
def __repr__(self) -> str:
return f"<Family: {self.short_name}(0x{self.id:X}), name={self.name}, parent={self.parent}>"

View File

@@ -1,67 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
import json
from typing import Tuple, Union
SliceLike = Union[slice, str, int]
def merge_dicts(d1, d2):
if d1 is not None and type(d1) != type(d2):
raise TypeError("d1 and d2 are different types")
if isinstance(d2, list):
if d1 is None:
d1 = []
d1.extend(merge_dicts(None, item) for item in d2)
elif isinstance(d2, dict):
if d1 is None:
d1 = {}
for key in d2:
d1[key] = merge_dicts(d1.get(key, None), d2[key])
else:
d1 = d2
return d1
def load_json(file: str) -> Union[dict, list]:
with open(file, "r", encoding="utf-8") as f:
return json.load(f)
def get(data: dict, path: str):
if not isinstance(data, dict) or not path:
return None
if "." not in path:
return data.get(path, None)
key, _, path = path.partition(".")
return get(data.get(key, None), path)
def slice2int(val: SliceLike) -> Tuple[int, int]:
"""Convert a slice-like value (slice, string '7:0' or '3', int '3')
to a tuple of (start, stop)."""
if isinstance(val, int):
return (val, val)
if isinstance(val, slice):
if val.step:
raise ValueError("value must be a slice without step")
if val.start < val.stop:
raise ValueError("start must not be less than stop")
return (val.start, val.stop)
if isinstance(val, str):
if ":" in val:
val = val.split(":")
if len(val) == 2:
return tuple(map(int, val))
elif val.isnumeric():
return (int(val), int(val))
raise ValueError(f"invalid slice format: {val}")
# https://stackoverflow.com/a/1094933/9438331
def sizeof(num: int, suffix="iB", base=1024.0) -> str:
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < base:
return f"{num:.1f} {unit}{suffix}".replace(".0 ", " ")
num /= base
return f"{num:.1f} Y{suffix}".replace(".0 ", " ")

View File

@@ -1,75 +0,0 @@
# Copyright (c) Kuba Szczodrzyński 2022-06-02.
from glob import glob
from os.path import basename, dirname, isfile, join
from typing import Dict, List, Union
from tools.util.models import Family
from tools.util.obj import load_json, merge_dicts
boards_base: Dict[str, dict] = {}
families: List[Family] = []
def get_board_list() -> List[str]:
boards_glob = join(dirname(__file__), "..", "..", "boards", "*.json")
return [basename(file)[:-5] for file in glob(boards_glob)]
def get_board_manifest(board: Union[str, dict]) -> dict:
boards_dir = join(dirname(__file__), "..", "..", "boards")
if not isinstance(board, dict):
if not isfile(board):
board = join(boards_dir, f"{board}.json")
board = load_json(board)
if "_base" in board:
base = board["_base"]
if not isinstance(base, list):
base = [base]
result = {}
for base_name in base:
if base_name not in boards_base:
file = join(boards_dir, "_base", f"{base_name}.json")
boards_base[base_name] = load_json(file)
merge_dicts(result, boards_base[base_name])
merge_dicts(result, board)
board = result
return board
def get_families() -> List[Family]:
global families
if families:
return families
file = join(dirname(__file__), "..", "..", "families.json")
families = [Family(f) for f in load_json(file)]
return families
def get_family(
any: str = None,
id: Union[str, int] = None,
short_name: str = None,
name: str = None,
code: str = None,
) -> Family:
if any:
id = any
short_name = any
name = any
code = any
if id and isinstance(id, str) and id.startswith("0x"):
id = int(id, 16)
for family in get_families():
if id and family.id == id:
return family
if short_name and family.short_name == short_name.upper():
return family
if name and family.name == name.lower():
return family
if code and family.code == code.lower():
return family
if any:
raise ValueError(f"Family not found - {any}")
text = ", ".join(filter(None, [id, short_name, name, code]))
raise ValueError(f"Family not found - {text}")