File: //opt/imunify360/venv/lib64/python3.11/site-packages/imav/wordpress/cli.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import logging
import pwd
import re
from pathlib import Path
from distutils.version import StrictVersion
from defence360agent.utils import (
check_run,
CheckRunError,
async_lru_cache,
)
from imav.wordpress import PLUGIN_PATH, PLUGIN_SLUG
from imav.wordpress.utils import (
build_command_for_user,
get_php_binary_path,
wp_wrapper,
)
from defence360agent.sentry import log_message
from imav.model.wordpress import WPSite
logger = logging.getLogger(__name__)
def _validate_semver(version_str: str) -> bool:
"""Validate if a string is a valid semantic version."""
# Handle None and non-string inputs
if not isinstance(version_str, str):
return False
# Trim the string and return False if empty
trimmed_str = version_str.strip()
if not trimmed_str:
return False
try:
StrictVersion(trimmed_str)
return True
except ValueError:
return False
def _extract_version_from_output(output: str) -> str:
"""
Extract version from WP CLI output, trying both first and last parts.
Args:
output: The raw output from WP CLI
Returns:
The extracted version string or None if no valid version found
"""
if not output:
return None
# Split the output into parts
parts = output.split()
if not parts:
return None
# Try the first part
if len(parts) > 0:
first_part = parts[0].strip()
if _validate_semver(first_part):
return first_part
# Try the last part
if len(parts) > 1:
last_part = parts[-1].strip()
if _validate_semver(last_part):
return last_part
# If neither first nor last part is valid semver, log to sentry
log_message(
"Failed to extract valid semver version from WP CLI output. Output:"
" '{output}'",
format_args={"output": output},
level="warning",
fingerprint="wp-plugin-version-extraction-failed",
)
# Return None when no valid semver is found
return None
async def _parse_version_from_plugin_file(site: WPSite) -> str:
"""
Parse the version of imunify-security plugin by reading the main plugin file.
Args:
site: WordPress site object containing docroot path
Returns:
str: Plugin version or None if not found or invalid
"""
content_dir = await get_content_dir(site)
plugin_file = (
content_dir / "plugins" / "imunify-security" / "imunify-security.php"
)
if not plugin_file.exists():
return None
version_pattern = re.compile(r"\* Version:\s*([0-9.]+)")
try:
with open(plugin_file) as f:
for line in f:
match = version_pattern.search(line)
if match:
version = match.group(1)
if _validate_semver(version):
return version
else:
return None
except Exception as e:
logger.error(
"Failed to read plugin file to determine version number %s: %s",
plugin_file,
e,
)
return None
return None
async def plugin_install(site: WPSite):
"""Install the Imunify Security WordPress plugin on given WordPress site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"install",
str(PLUGIN_PATH),
"--activate",
"--force",
]
command = build_command_for_user(username, args)
logger.info(f"Installing wp plugin {command}")
await check_run(command)
async def plugin_update(site: WPSite):
"""
Update the Imunify Security WordPress plugin on given WordPress site.
Currently, this is the same as install, but in the future it may differ.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"install",
str(PLUGIN_PATH),
"--activate",
"--force",
]
command = build_command_for_user(username, args)
logger.info(f"Updating wp plugin {command}")
await check_run(command)
async def plugin_uninstall(site: WPSite):
"""Uninstall the imunify-security wp plugin from given wp site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"uninstall",
PLUGIN_SLUG,
"--deactivate",
]
command = build_command_for_user(username, args)
logger.info(f"Uninstalling wp plugin {command}")
await check_run(command)
async def _get_plugin_version(site: WPSite):
"""
Get the version of the imunify-security wp plugin installed on given WordPress site.
Uses WP CLI to get the version.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"get",
PLUGIN_SLUG,
"--field=version",
]
command = build_command_for_user(username, args)
logger.info(f"Getting wp plugin version {command}")
try:
result = await check_run(command)
output = result.decode("utf-8").strip()
return _extract_version_from_output(output)
except CheckRunError as e:
logger.error(
"Failed to get wp plugin version. Return code: %s",
e.returncode,
)
return None
except UnicodeDecodeError as e:
logger.error("Failed to decode wp plugin version output: %s", e)
return None
async def get_plugin_version(site: WPSite):
"""
Get the version of the imunify-security wp plugin installed on given WordPress site.
First tries to parse the version from the plugin file, then falls back to WP CLI.
"""
# First try to parse version from plugin file
try:
version = await _parse_version_from_plugin_file(site)
if version:
return version
except Exception as e:
logger.warning("Failed to parse version from plugin file: %s", e)
# Fall back to WP CLI if file parsing fails
logger.info("Plugin version not found in file, trying WP CLI")
return await _get_plugin_version(site)
async def is_plugin_installed(site: WPSite):
"""Check if the imunify-security wp plugin is installed on given WordPress site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"is-installed",
PLUGIN_SLUG,
]
command = build_command_for_user(username, args)
logger.info(f"Checking if wp plugin is installed {command}")
try:
await check_run(command)
except CheckRunError:
# exit code other than 0 means plugin is not installed or there is an error
return False
# exit code 0 means plugin is installed
return True
async def is_wordpress_installed(site: WPSite):
"""Check if WordPress is installed and given site is accessible using WP CLI."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"core",
"is-installed",
]
command = build_command_for_user(username, args)
logger.info(f"Checking if WordPress is installed {command}")
try:
# exit code other than 0 means WordPress is not installed or there is an error
await check_run(command)
except CheckRunError:
return False
# exit code 0 means WordPress is installed
return True
async def _get_content_directory(site: WPSite):
"""
Get the content directory of the WordPress site using WP CLI.
This should only be used if the default wp-content directory does not exist.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"eval",
"echo WP_CONTENT_DIR;",
]
command = build_command_for_user(username, args)
logger.info(f"Getting content directory {command}")
try:
result = await check_run(command)
return result.decode("utf-8").strip()
except CheckRunError as e:
logger.error(
"Failed to get content directory. Return code: %s",
e.returncode,
)
return None
except UnicodeDecodeError as e:
logger.error("Failed to decode content directory output: %s", e)
return None
@async_lru_cache(maxsize=100)
async def get_content_dir(site: WPSite):
"""
Get the WordPress content directory for the given WordPress site.
This function first checks if the default wp-content directory exists at the site's docroot.
If the default path doesn't exist or isn't a directory, it attempts to get the actual
content directory using WordPress CLI's WP_CONTENT_DIR constant.
Returns:
Path: The WordPress content directory path
"""
content_dir = Path(site.docroot) / "wp-content"
# First check if content_dir exists and is a folder
if not content_dir.exists() or not content_dir.is_dir():
# If not, try to get the content directory using WP CLI
wp_content_dir = await _get_content_directory(site)
if wp_content_dir:
content_dir = Path(wp_content_dir)
return content_dir
def clear_get_content_dir_cache():
"""Clear the async LRU cache for get_content_dir."""
get_content_dir.cache_clear()
async def get_data_dir(site: WPSite):
"""
Get the Imunify Security data directory for the given WordPress site.
"""
content_dir = await get_content_dir(site)
if not content_dir:
content_dir = Path(site.docroot) / "wp-content"
return Path(content_dir) / "imunify-security"