Initial implementation of XAP protocol.

This commit is contained in:
Nick Brassel 2021-08-11 21:08:32 +10:00
parent f4c447f2df
commit eba91c6e28
34 changed files with 1934 additions and 4 deletions

View file

81
lib/python/qmk/xap/common.py Executable file
View file

@ -0,0 +1,81 @@
"""This script handles the XAP protocol data files.
"""
import re
import hjson
from typing import OrderedDict
from qmk.constants import QMK_FIRMWARE
def _merge_ordered_dicts(dicts):
"""Merges nested OrderedDict objects resulting from reading a hjson file.
Later input dicts overrides earlier dicts for plain values.
Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
"""
result = OrderedDict()
def add_entry(target, k, v):
if k in target and isinstance(v, OrderedDict):
if "!reset!" in v:
target[k] = v
else:
target[k] = _merge_ordered_dicts([target[k], v])
if "!reset!" in target[k]:
del target[k]["!reset!"]
elif k in target and isinstance(v, list):
if v[0] == '!reset!':
target[k] = v[1:]
else:
target[k] = target[k] + v
else:
target[k] = v
for d in dicts:
for (k, v) in d.items():
add_entry(result, k, v)
return result
def get_xap_definition_files():
"""Get the sorted list of XAP definition files, from <QMK>/data/xap.
"""
xap_defs = QMK_FIRMWARE / "data" / "xap"
return list(sorted(xap_defs.glob('**/xap_*.hjson')))
def update_xap_definitions(original, new):
"""Creates a new XAP definition object based on an original and the new supplied object.
Both inputs must be of type OrderedDict.
Later input dicts overrides earlier dicts for plain values.
Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
"""
if original is None:
original = OrderedDict()
return _merge_ordered_dicts([original, new])
def latest_xap_defs():
"""Gets the latest version of the XAP definitions.
"""
definitions = [hjson.load(file.open(encoding='utf-8')) for file in get_xap_definition_files()]
return _merge_ordered_dicts(definitions)
def route_conditions(route_stack):
"""Handles building the C preprocessor conditional based on the current route.
"""
conditions = []
for route in route_stack:
if 'enable_if_preprocessor' in route:
conditions.append(route['enable_if_preprocessor'])
if len(conditions) == 0:
return None
return "(" + ' && '.join([f'({c})' for c in conditions]) + ")"

View file

View file

@ -0,0 +1,103 @@
"""This script generates the XAP protocol documentation.
"""
import hjson
from qmk.constants import QMK_FIRMWARE
from qmk.xap.common import get_xap_definition_files, update_xap_definitions
def _update_type_docs(overall):
defs = overall['type_docs']
type_docs = []
for (k, v) in sorted(defs.items(), key=lambda x: x[0]):
type_docs.append(f'| _{k}_ | {v} |')
desc_str = "\n".join(type_docs)
overall['documentation']['!type_docs!'] = f'''\
| Name | Definition |
| -- | -- |
{desc_str}
'''
def _update_term_definitions(overall):
defs = overall['term_definitions']
term_descriptions = []
for (k, v) in sorted(defs.items(), key=lambda x: x[0]):
term_descriptions.append(f'| _{k}_ | {v} |')
desc_str = "\n".join(term_descriptions)
overall['documentation']['!term_definitions!'] = f'''\
| Name | Definition |
| -- | -- |
{desc_str}
'''
def _update_response_flags(overall):
flags = overall['response_flags']['bits']
for n in range(0, 8):
if str(n) not in flags:
flags[str(n)] = {"name": "-", "description": "-"}
header = '| ' + " | ".join([f'Bit {n}' for n in range(7, -1, -1)]) + ' |'
dividers = '|' + "|".join(['--' for n in range(7, -1, -1)]) + '|'
bit_names = '| ' + " | ".join([flags[str(n)]['name'] for n in range(7, -1, -1)]) + ' |'
bit_descriptions = ''
for n in range(7, -1, -1):
bit_desc = flags[str(n)]
if bit_desc['name'] != '-':
desc = bit_desc['description']
bit_descriptions = bit_descriptions + f'\n* `Bit {n}`: {desc}'
overall['documentation']['!response_flags!'] = f'''\
{header}
{dividers}
{bit_names}
{bit_descriptions}
'''
def generate_docs():
"""Generates the XAP protocol documentation by merging the definitions files, and producing the corresponding Markdown document under `/docs/`.
"""
docs_list = []
overall = None
for file in get_xap_definition_files():
overall = update_xap_definitions(overall, hjson.load(file.open(encoding='utf-8')))
try:
if 'type_docs' in overall:
_update_type_docs(overall)
if 'term_definitions' in overall:
_update_term_definitions(overall)
if 'response_flags' in overall:
_update_response_flags(overall)
except:
print(hjson.dumps(overall))
exit(1)
output_doc = QMK_FIRMWARE / "docs" / f"{file.stem}.md"
docs_list.append(output_doc)
with open(output_doc, "w", encoding='utf-8') as out_file:
for e in overall['documentation']['order']:
out_file.write(overall['documentation'][e].strip())
out_file.write('\n\n')
output_doc = QMK_FIRMWARE / "docs" / f"xap_protocol.md"
with open(output_doc, "w", encoding='utf-8') as out_file:
out_file.write('''\
# XAP Protocol Reference
''')
for file in reversed(sorted(docs_list)):
ver = file.stem[4:]
out_file.write(f'* [XAP Version {ver}]({file.name})\n')

View file

@ -0,0 +1,136 @@
"""This script generates the XAP protocol generated header to be compiled into QMK.
"""
import re
import pyhash
from qmk.commands import get_git_version
from qmk.constants import GPL2_HEADER_C_LIKE, GENERATED_HEADER_C_LIKE
from qmk.xap.common import latest_xap_defs, route_conditions
def _append_route_defines(lines, container, container_id=None, route_stack=None):
"""Handles building the list of the XAP routes, combining parent and child names together, as well as the route number.
"""
if route_stack is None:
route_stack = [container]
else:
route_stack.append(container)
route_name = '_'.join([r['define'] for r in route_stack])
if container_id:
lines.append(f'#define {route_name} {container_id}')
if 'routes' in container:
for route_id in container['routes']:
route = container['routes'][route_id]
_append_route_defines(lines, route, route_id, route_stack)
route_stack.pop()
def _append_route_masks(lines, container, container_id=None, route_stack=None):
"""Handles creating the equivalent XAP route masks, for capabilities checks. Forces value of `0` if disabled in the firmware.
"""
if route_stack is None:
route_stack = [container]
else:
route_stack.append(container)
route_name = '_'.join([r['define'] for r in route_stack])
condition = route_conditions(route_stack)
if container_id:
if condition:
lines.append('')
lines.append(f'#if {condition}')
lines.append(f'#define {route_name}_MASK (1ul << ({route_name}))')
if condition:
lines.append(f'#else // {condition}')
lines.append(f'#define {route_name}_MASK 0')
lines.append(f'#endif // {condition}')
lines.append('')
if 'routes' in container:
for route_id in container['routes']:
route = container['routes'][route_id]
_append_route_masks(lines, route, route_id, route_stack)
route_stack.pop()
def _append_route_capabilities(lines, container, container_id=None, route_stack=None):
"""Handles creating the equivalent XAP route masks, for capabilities checks. Forces value of `0` if disabled in the firmware.
"""
if route_stack is None:
route_stack = [container]
else:
route_stack.append(container)
route_name = '_'.join([r['define'] for r in route_stack])
if 'routes' in container:
lines.append('')
lines.append(f'#define {route_name}_CAPABILITIES (0 \\')
if 'routes' in container:
for route_id in container['routes']:
route = container['routes'][route_id]
route_stack.append(route)
child_name = '_'.join([r['define'] for r in route_stack])
lines.append(f' | ({child_name}_MASK) \\')
route_stack.pop()
lines.append(' )')
if 'routes' in container:
for route_id in container['routes']:
route = container['routes'][route_id]
_append_route_capabilities(lines, route, route_id, route_stack)
route_stack.pop()
def generate_header(output_file, keyboard):
"""Generates the XAP protocol header file, generated during normal build.
"""
xap_defs = latest_xap_defs()
# Preamble
lines = [GPL2_HEADER_C_LIKE, GENERATED_HEADER_C_LIKE, '#pragma once', '']
# Versions
prog = re.compile(r'^(\d+)\.(\d+)\.(\d+)')
b = prog.match(xap_defs['version'])
lines.append(f'#define XAP_BCD_VERSION 0x{int(b.group(1)):02d}{int(b.group(2)):02d}{int(b.group(3)):04d}ul')
b = prog.match(get_git_version())
lines.append(f'#define QMK_BCD_VERSION 0x{int(b.group(1)):02d}{int(b.group(2)):02d}{int(b.group(3)):04d}ul')
keyboard_id = pyhash.murmur3_32()(keyboard)
lines.append(f'#define XAP_KEYBOARD_IDENTIFIER 0x{keyboard_id:08X}ul')
lines.append('')
# Append the route and command defines
_append_route_defines(lines, xap_defs)
lines.append('')
_append_route_masks(lines, xap_defs)
lines.append('')
_append_route_capabilities(lines, xap_defs)
lines.append('')
# Generate the full output
xap_generated_inl = '\n'.join(lines)
# Clean up newlines
while "\n\n\n" in xap_generated_inl:
xap_generated_inl = xap_generated_inl.replace("\n\n\n", "\n\n")
if output_file:
if output_file.name == '-':
print(xap_generated_inl)
else:
output_file.parent.mkdir(parents=True, exist_ok=True)
if output_file.exists():
output_file.replace(output_file.parent / (output_file.name + '.bak'))
output_file.write_text(xap_generated_inl)

View file

@ -0,0 +1,222 @@
"""This script generates the XAP protocol generated header to be compiled into QMK.
"""
import pyhash
from qmk.casing import to_snake
from qmk.constants import GPL2_HEADER_C_LIKE, GENERATED_HEADER_C_LIKE
from qmk.xap.common import latest_xap_defs, route_conditions
def _get_c_type(xap_type):
if xap_type == 'bool':
return 'bool'
elif xap_type == 'u8':
return 'uint8_t'
elif xap_type == 'u16':
return 'uint16_t'
elif xap_type == 'u32':
return 'uint32_t'
elif xap_type == 'u64':
return 'uint64_t'
elif xap_type == 'struct':
return 'struct'
elif xap_type == 'string':
return 'const char *'
return 'unknown'
def _get_route_type(container):
if 'routes' in container:
return 'XAP_ROUTE'
elif 'return_constant' in container:
if container['return_type'] == 'u32':
return 'XAP_VALUE'
elif container['return_type'] == 'struct':
return 'XAP_CONST_MEM'
elif container['return_type'] == 'string':
return 'XAP_CONST_MEM'
elif 'return_getter' in container:
if container['return_type'] == 'u32':
return 'XAP_GETTER'
return 'UNSUPPORTED'
def _append_routing_table_declaration(lines, container, container_id, route_stack):
route_stack.append(container)
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
if 'routes' in container:
pass
elif 'return_constant' in container:
if container['return_type'] == 'u32':
pass
elif container['return_type'] == 'struct':
lines.append('')
lines.append(f'static const struct {route_name}_t {{')
for member in container['return_struct_members']:
member_type = _get_c_type(member['type'])
member_name = to_snake(member['name'])
lines.append(f' const {member_type} {member_name};')
lines.append(f'}} {route_name}_data PROGMEM = {{')
for constant in container['return_constant']:
lines.append(f' {constant},')
lines.append(f'}};')
elif container['return_type'] == 'string':
constant = container['return_constant']
lines.append('')
lines.append(f'static const char {route_name}_str[] PROGMEM = {constant};')
elif 'return_getter' in container:
if container['return_type'] == 'u32':
lines.append('')
lines.append(f'extern uint32_t {route_name}_getter(void);')
elif container['return_type'] == 'struct':
pass
route_stack.pop()
def _append_routing_table_entry_flags(lines, container, container_id, route_stack):
is_secure = 1 if ('secure' in container and container['secure'] is True) else 0
lines.append(f' .flags = {{')
lines.append(f' .type = {_get_route_type(container)},')
lines.append(f' .is_secure = {is_secure},')
lines.append(f' }},')
def _append_routing_table_entry_route(lines, container, container_id, route_stack):
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
lines.append(f' .child_routes = {route_name}_table,')
lines.append(f' .child_routes_len = sizeof({route_name}_table)/sizeof(xap_route_t),')
def _append_routing_table_entry_u32value(lines, container, container_id, route_stack):
value = container['return_constant']
lines.append(f' .u32value = {value},')
def _append_routing_table_entry_u32getter(lines, container, container_id, route_stack):
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
lines.append(f' .u32getter = &{route_name}_getter,')
def _append_routing_table_entry_const_data(lines, container, container_id, route_stack):
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
lines.append(f' .const_data = &{route_name}_data,')
lines.append(f' .const_data_len = sizeof({route_name}_data),')
def _append_routing_table_entry_string(lines, container, container_id, route_stack):
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
lines.append(f' .const_data = {route_name}_str,')
lines.append(f' .const_data_len = sizeof({route_name}_str) - 1,')
def _append_routing_table_entry(lines, container, container_id, route_stack):
route_stack.append(container)
route_name = '_'.join([r['define'] for r in route_stack])
condition = route_conditions(route_stack)
if condition:
lines.append(f'#if {condition}')
lines.append(f' [{route_name}] = {{')
_append_routing_table_entry_flags(lines, container, container_id, route_stack)
if 'routes' in container:
_append_routing_table_entry_route(lines, container, container_id, route_stack)
elif 'return_constant' in container:
if container['return_type'] == 'u32':
_append_routing_table_entry_u32value(lines, container, container_id, route_stack)
elif container['return_type'] == 'struct':
_append_routing_table_entry_const_data(lines, container, container_id, route_stack)
elif container['return_type'] == 'string':
_append_routing_table_entry_string(lines, container, container_id, route_stack)
elif 'return_getter' in container:
if container['return_type'] == 'u32':
_append_routing_table_entry_u32getter(lines, container, container_id, route_stack)
lines.append(f' }},')
if condition:
lines.append(f'#endif // {condition}')
route_stack.pop()
def _append_routing_tables(lines, container, container_id=None, route_stack=None):
"""Handles building the list of the XAP routes, combining parent and child names together, as well as the route number.
"""
if route_stack is None:
route_stack = [container]
else:
route_stack.append(container)
route_name = to_snake('_'.join([r['define'] for r in route_stack]))
condition = route_conditions(route_stack)
if 'routes' in container:
for route_id in container['routes']:
route = container['routes'][route_id]
_append_routing_tables(lines, route, route_id, route_stack)
for route_id in container['routes']:
route = container['routes'][route_id]
_append_routing_table_declaration(lines, route, route_id, route_stack)
lines.append('')
if condition:
lines.append(f'#if {condition}')
lines.append(f'static const xap_route_t {route_name}_table[] PROGMEM = {{')
for route_id in container['routes']:
route = container['routes'][route_id]
_append_routing_table_entry(lines, route, route_id, route_stack)
lines.append('};')
if condition:
lines.append(f'#endif // {condition}')
lines.append('')
route_stack.pop()
def generate_inline(output_file):
"""Generates the XAP protocol header file, generated during normal build.
"""
xap_defs = latest_xap_defs()
# Preamble
lines = [GPL2_HEADER_C_LIKE, GENERATED_HEADER_C_LIKE, '']
# Add all the generated code
_append_routing_tables(lines, xap_defs)
# Generate the full output
xap_generated_inl = '\n'.join(lines)
# Clean up newlines
while "\n\n\n" in xap_generated_inl:
xap_generated_inl = xap_generated_inl.replace("\n\n\n", "\n\n")
if output_file:
if output_file.name == '-':
print(xap_generated_inl)
else:
output_file.parent.mkdir(parents=True, exist_ok=True)
if output_file.exists():
output_file.replace(output_file.parent / (output_file.name + '.bak'))
output_file.write_text(xap_generated_inl)