Files
esphome/esphome/components/speaker/media_player/__init__.py
Jonathan Swoboda 45e61f100c [core] Replace USE_ESP_IDF with USE_ESP32 across components (#12673)
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
2025-12-27 11:59:55 -10:00

453 lines
14 KiB
Python

"""Speaker Media Player Setup."""
import hashlib
import logging
from pathlib import Path
from esphome import automation, external_files
import esphome.codegen as cg
from esphome.components import audio, esp32, media_player, network, ota, psram, speaker
import esphome.config_validation as cv
from esphome.const import (
CONF_BUFFER_SIZE,
CONF_FILE,
CONF_FILES,
CONF_FORMAT,
CONF_ID,
CONF_NUM_CHANNELS,
CONF_PATH,
CONF_RAW_DATA_ID,
CONF_SAMPLE_RATE,
CONF_SPEAKER,
CONF_TASK_STACK_IN_PSRAM,
CONF_TYPE,
CONF_URL,
)
from esphome.core import CORE, HexInt
from esphome.core.entity_helpers import inherit_property_from
from esphome.external_files import download_content
from esphome.final_validate import full_config
_LOGGER = logging.getLogger(__name__)
AUTO_LOAD = ["audio"]
DEPENDENCIES = ["network"]
CODEOWNERS = ["@kahrendt", "@synesthesiam"]
DOMAIN = "media_player"
TYPE_LOCAL = "local"
TYPE_WEB = "web"
CONF_ANNOUNCEMENT = "announcement"
CONF_ANNOUNCEMENT_PIPELINE = "announcement_pipeline"
CONF_CODEC_SUPPORT_ENABLED = "codec_support_enabled"
CONF_ENQUEUE = "enqueue"
CONF_MEDIA_FILE = "media_file"
CONF_MEDIA_PIPELINE = "media_pipeline"
CONF_ON_MUTE = "on_mute"
CONF_ON_UNMUTE = "on_unmute"
CONF_ON_VOLUME = "on_volume"
CONF_STREAM = "stream"
CONF_VOLUME_INCREMENT = "volume_increment"
CONF_VOLUME_INITIAL = "volume_initial"
CONF_VOLUME_MIN = "volume_min"
CONF_VOLUME_MAX = "volume_max"
speaker_ns = cg.esphome_ns.namespace("speaker")
SpeakerMediaPlayer = speaker_ns.class_(
"SpeakerMediaPlayer",
media_player.MediaPlayer,
cg.Component,
)
AudioPipeline = speaker_ns.class_("AudioPipeline")
AudioPipelineType = speaker_ns.enum("AudioPipelineType", is_class=True)
AUDIO_PIPELINE_TYPE_ENUM = {
"MEDIA": AudioPipelineType.MEDIA,
"ANNOUNCEMENT": AudioPipelineType.ANNOUNCEMENT,
}
PlayOnDeviceMediaAction = speaker_ns.class_(
"PlayOnDeviceMediaAction",
automation.Action,
cg.Parented.template(SpeakerMediaPlayer),
)
StopStreamAction = speaker_ns.class_(
"StopStreamAction", automation.Action, cg.Parented.template(SpeakerMediaPlayer)
)
def _compute_local_file_path(value: dict) -> Path:
url = value[CONF_URL]
h = hashlib.new("sha256")
h.update(url.encode())
key = h.hexdigest()[:8]
base_dir = external_files.compute_local_file_dir(DOMAIN)
_LOGGER.debug("_compute_local_file_path: base_dir=%s", base_dir / key)
return base_dir / key
def _download_web_file(value):
url = value[CONF_URL]
path = _compute_local_file_path(value)
download_content(url, path)
_LOGGER.debug("download_web_file: path=%s", path)
return value
# Returns a media_player.MediaPlayerSupportedFormat struct with the configured
# format, sample rate, number of channels, purpose, and bytes per sample
def _get_supported_format_struct(pipeline, type):
args = [
media_player.MediaPlayerSupportedFormat,
]
if pipeline[CONF_FORMAT] == "FLAC":
args.append(("format", "flac"))
elif pipeline[CONF_FORMAT] == "MP3":
args.append(("format", "mp3"))
elif pipeline[CONF_FORMAT] == "WAV":
args.append(("format", "wav"))
args.append(("sample_rate", pipeline[CONF_SAMPLE_RATE]))
args.append(("num_channels", pipeline[CONF_NUM_CHANNELS]))
if type == "MEDIA":
args.append(
(
"purpose",
media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["default"],
)
)
elif type == "ANNOUNCEMENT":
args.append(
(
"purpose",
media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["announcement"],
)
)
if pipeline[CONF_FORMAT] != "MP3":
args.append(("sample_bytes", 2))
return cg.StructInitializer(*args)
def _file_schema(value):
if isinstance(value, str):
return _validate_file_shorthand(value)
return TYPED_FILE_SCHEMA(value)
def _read_audio_file_and_type(file_config):
conf_file = file_config[CONF_FILE]
file_source = conf_file[CONF_TYPE]
if file_source == TYPE_LOCAL:
path = CORE.relative_config_path(conf_file[CONF_PATH])
elif file_source == TYPE_WEB:
path = _compute_local_file_path(conf_file)
else:
raise cv.Invalid("Unsupported file source")
with open(path, "rb") as f:
data = f.read()
import puremagic
file_type: str = puremagic.from_string(data)
file_type = file_type.removeprefix(".")
media_file_type = audio.AUDIO_FILE_TYPE_ENUM["NONE"]
if file_type in ("wav"):
media_file_type = audio.AUDIO_FILE_TYPE_ENUM["WAV"]
elif file_type in ("mp3", "mpeg", "mpga"):
media_file_type = audio.AUDIO_FILE_TYPE_ENUM["MP3"]
elif file_type in ("flac"):
media_file_type = audio.AUDIO_FILE_TYPE_ENUM["FLAC"]
return data, media_file_type
def _validate_file_shorthand(value):
value = cv.string_strict(value)
if value.startswith("http://") or value.startswith("https://"):
return _file_schema(
{
CONF_TYPE: TYPE_WEB,
CONF_URL: value,
}
)
return _file_schema(
{
CONF_TYPE: TYPE_LOCAL,
CONF_PATH: value,
}
)
def _validate_pipeline(config):
# Inherit transcoder settings from speaker if not manually set
inherit_property_from(CONF_NUM_CHANNELS, CONF_SPEAKER)(config)
inherit_property_from(CONF_SAMPLE_RATE, CONF_SPEAKER)(config)
# Validate the transcoder settings is compatible with the speaker
audio.final_validate_audio_schema(
"speaker media_player",
audio_device=CONF_SPEAKER,
bits_per_sample=16,
channels=config.get(CONF_NUM_CHANNELS),
sample_rate=config.get(CONF_SAMPLE_RATE),
)(config)
return config
def _validate_repeated_speaker(config):
if (
(announcement_config := config.get(CONF_ANNOUNCEMENT_PIPELINE))
and (media_config := config.get(CONF_MEDIA_PIPELINE))
and announcement_config[CONF_SPEAKER] == media_config[CONF_SPEAKER]
):
raise cv.Invalid(
"The announcement and media pipelines cannot use the same speaker. Use the `mixer` speaker component to create two source speakers."
)
return config
def _final_validate(config):
# Default to using codec if psram is enabled
if (use_codec := config.get(CONF_CODEC_SUPPORT_ENABLED)) is None:
use_codec = psram.DOMAIN in full_config.get()
conf_id = config[CONF_ID].id
core_data = CORE.data.setdefault(DOMAIN, {conf_id: {}})
core_data[conf_id][CONF_CODEC_SUPPORT_ENABLED] = use_codec
for file_config in config.get(CONF_FILES, []):
_, media_file_type = _read_audio_file_and_type(file_config)
if str(media_file_type) == str(audio.AUDIO_FILE_TYPE_ENUM["NONE"]):
raise cv.Invalid("Unsupported local media file")
if not use_codec and str(media_file_type) != str(
audio.AUDIO_FILE_TYPE_ENUM["WAV"]
):
# Only wav files are supported
raise cv.Invalid(
f"Unsupported local media file type, set {CONF_CODEC_SUPPORT_ENABLED} to true or convert the media file to wav"
)
return config
LOCAL_SCHEMA = cv.Schema(
{
cv.Required(CONF_PATH): cv.file_,
}
)
WEB_SCHEMA = cv.All(
{
cv.Required(CONF_URL): cv.url,
},
_download_web_file,
)
TYPED_FILE_SCHEMA = cv.typed_schema(
{
TYPE_LOCAL: LOCAL_SCHEMA,
TYPE_WEB: WEB_SCHEMA,
},
)
MEDIA_FILE_TYPE_SCHEMA = cv.Schema(
{
cv.Required(CONF_ID): cv.declare_id(audio.AudioFile),
cv.Required(CONF_FILE): _file_schema,
cv.GenerateID(CONF_RAW_DATA_ID): cv.declare_id(cg.uint8),
}
)
PIPELINE_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(AudioPipeline),
cv.Required(CONF_SPEAKER): cv.use_id(speaker.Speaker),
cv.Optional(CONF_FORMAT, default="FLAC"): cv.enum(audio.AUDIO_FILE_TYPE_ENUM),
cv.Optional(CONF_SAMPLE_RATE): cv.int_range(min=1),
cv.Optional(CONF_NUM_CHANNELS): cv.int_range(1, 2),
}
)
def _request_high_performance_networking(config):
"""Request high performance networking for streaming media.
Speaker media player streams audio data, so it always benefits from
optimized WiFi and lwip settings regardless of codec support.
Called during config validation to ensure flags are set before to_code().
"""
network.require_high_performance_networking()
return config
CONFIG_SCHEMA = cv.All(
media_player.media_player_schema(SpeakerMediaPlayer).extend(
{
cv.Required(CONF_ANNOUNCEMENT_PIPELINE): PIPELINE_SCHEMA,
cv.Optional(CONF_MEDIA_PIPELINE): PIPELINE_SCHEMA,
cv.Optional(CONF_BUFFER_SIZE, default=1000000): cv.int_range(
min=4000, max=4000000
),
cv.Optional(CONF_CODEC_SUPPORT_ENABLED): cv.boolean,
cv.Optional(CONF_FILES): cv.ensure_list(MEDIA_FILE_TYPE_SCHEMA),
cv.Optional(CONF_TASK_STACK_IN_PSRAM): cv.All(
cv.boolean, cv.requires_component(psram.DOMAIN)
),
cv.Optional(CONF_VOLUME_INCREMENT, default=0.05): cv.percentage,
cv.Optional(CONF_VOLUME_INITIAL, default=0.5): cv.percentage,
cv.Optional(CONF_VOLUME_MAX, default=1.0): cv.percentage,
cv.Optional(CONF_VOLUME_MIN, default=0.0): cv.percentage,
cv.Optional(CONF_ON_MUTE): automation.validate_automation(single=True),
cv.Optional(CONF_ON_UNMUTE): automation.validate_automation(single=True),
cv.Optional(CONF_ON_VOLUME): automation.validate_automation(single=True),
}
),
cv.only_on_esp32,
_validate_repeated_speaker,
_request_high_performance_networking,
)
FINAL_VALIDATE_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_ANNOUNCEMENT_PIPELINE): _validate_pipeline,
cv.Optional(CONF_MEDIA_PIPELINE): _validate_pipeline,
},
extra=cv.ALLOW_EXTRA,
),
_final_validate,
)
async def to_code(config):
if CORE.data[DOMAIN][config[CONF_ID].id][CONF_CODEC_SUPPORT_ENABLED]:
# Compile all supported audio codecs
cg.add_define("USE_AUDIO_FLAC_SUPPORT", True)
cg.add_define("USE_AUDIO_MP3_SUPPORT", True)
var = await media_player.new_media_player(config)
await cg.register_component(var, config)
ota.request_ota_state_listeners()
cg.add(var.set_buffer_size(config[CONF_BUFFER_SIZE]))
if config.get(CONF_TASK_STACK_IN_PSRAM):
cg.add(var.set_task_stack_in_psram(True))
esp32.add_idf_sdkconfig_option(
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True
)
cg.add(var.set_volume_increment(config[CONF_VOLUME_INCREMENT]))
cg.add(var.set_volume_initial(config[CONF_VOLUME_INITIAL]))
cg.add(var.set_volume_max(config[CONF_VOLUME_MAX]))
cg.add(var.set_volume_min(config[CONF_VOLUME_MIN]))
announcement_pipeline_config = config[CONF_ANNOUNCEMENT_PIPELINE]
spkr = await cg.get_variable(announcement_pipeline_config[CONF_SPEAKER])
cg.add(var.set_announcement_speaker(spkr))
if announcement_pipeline_config[CONF_FORMAT] != "NONE":
cg.add(
var.set_announcement_format(
_get_supported_format_struct(
announcement_pipeline_config, "ANNOUNCEMENT"
)
)
)
if media_pipeline_config := config.get(CONF_MEDIA_PIPELINE):
spkr = await cg.get_variable(media_pipeline_config[CONF_SPEAKER])
cg.add(var.set_media_speaker(spkr))
if media_pipeline_config[CONF_FORMAT] != "NONE":
cg.add(
var.set_media_format(
_get_supported_format_struct(media_pipeline_config, "MEDIA")
)
)
if on_mute := config.get(CONF_ON_MUTE):
await automation.build_automation(
var.get_mute_trigger(),
[],
on_mute,
)
if on_unmute := config.get(CONF_ON_UNMUTE):
await automation.build_automation(
var.get_unmute_trigger(),
[],
on_unmute,
)
if on_volume := config.get(CONF_ON_VOLUME):
await automation.build_automation(
var.get_volume_trigger(),
[(cg.float_, "x")],
on_volume,
)
for file_config in config.get(CONF_FILES, []):
data, media_file_type = _read_audio_file_and_type(file_config)
rhs = [HexInt(x) for x in data]
prog_arr = cg.progmem_array(file_config[CONF_RAW_DATA_ID], rhs)
media_files_struct = cg.StructInitializer(
audio.AudioFile,
(
"data",
prog_arr,
),
(
"length",
len(rhs),
),
(
"file_type",
media_file_type,
),
)
cg.new_Pvariable(
file_config[CONF_ID],
media_files_struct,
)
@automation.register_action(
"media_player.speaker.play_on_device_media_file",
PlayOnDeviceMediaAction,
cv.maybe_simple_value(
{
cv.GenerateID(): cv.use_id(SpeakerMediaPlayer),
cv.Required(CONF_MEDIA_FILE): cv.use_id(audio.AudioFile),
cv.Optional(CONF_ANNOUNCEMENT, default=False): cv.templatable(cv.boolean),
cv.Optional(CONF_ENQUEUE, default=False): cv.templatable(cv.boolean),
},
key=CONF_MEDIA_FILE,
),
)
async def play_on_device_media_media_action(config, action_id, template_arg, args):
var = cg.new_Pvariable(action_id, template_arg)
await cg.register_parented(var, config[CONF_ID])
media_file = await cg.get_variable(config[CONF_MEDIA_FILE])
announcement = await cg.templatable(config[CONF_ANNOUNCEMENT], args, cg.bool_)
enqueue = await cg.templatable(config[CONF_ENQUEUE], args, cg.bool_)
cg.add(var.set_audio_file(media_file))
cg.add(var.set_announcement(announcement))
cg.add(var.set_enqueue(enqueue))
return var