init commit

This commit is contained in:
unknown
2025-08-19 08:06:37 -04:00
commit 2957b5515a
743 changed files with 45495 additions and 0 deletions
+16
View File
@@ -0,0 +1,16 @@
# Talon command client
This directory contains the client code for communicating with the [VSCode command server](https://marketplace.visualstudio.com/items?itemName=pokey.command-server).
## Contributing
The source of truth is in https://github.com/talonhub/community/tree/main/core/command_client, but the code is also maintained as a subtree at https://github.com/cursorless-dev/talon-command-client.
To contribute, first open a PR on `community`.
Once the PR is merged, you can push the changes to the subtree by running the following commands on an up-to-date `community` main: (need write access)
```sh
git subtree split --prefix=core/command_client --annotate="[split] " -b split
git push git@github.com:cursorless-dev/talon-command-client.git split:main
```
@@ -0,0 +1,199 @@
from pathlib import Path
from typing import Any
from talon import Context, Module, actions, speech_system
from .rpc_client.get_communication_dir_path import get_communication_dir_path
# Indicates whether a pre-phrase signal was emitted during the course of the
# current phrase
did_emit_pre_phrase_signal = False
mod = Module()
ctx = Context()
mac_ctx = Context()
ctx.matches = r"""
tag: user.command_client
"""
mac_ctx.matches = r"""
os: mac
tag: user.command_client
"""
class NotSet:
def __repr__(self):
return "<argument not set>"
def run_command(
command_id: str,
*args,
wait_for_finish: bool = False,
return_command_output: bool = False,
):
"""Runs a command, using command server if available
Args:
command_id (str): The ID of the command to run.
args: The arguments to the command.
wait_for_finish (bool, optional): Whether to wait for the command to finish before returning. Defaults to False.
return_command_output (bool, optional): Whether to return the output of the command. Defaults to False.
Raises:
Exception: If there is an issue with the file-based communication, or
application raises an exception
Returns:
Object: The response from the command, if requested.
"""
# NB: This is a hack to work around the fact that talon doesn't support
# variable argument lists
args = [x for x in args if x is not NotSet]
return actions.user.rpc_client_run_command(
actions.user.command_server_directory(),
actions.user.trigger_command_server_command_execution,
command_id,
args,
wait_for_finish,
return_command_output,
)
@mod.action_class
class Actions:
def run_rpc_command(
command_id: str,
arg1: Any = NotSet,
arg2: Any = NotSet,
arg3: Any = NotSet,
arg4: Any = NotSet,
arg5: Any = NotSet,
):
"""Execute command via RPC."""
run_command(
command_id,
arg1,
arg2,
arg3,
arg4,
arg5,
)
def run_rpc_command_and_wait(
command_id: str,
arg1: Any = NotSet,
arg2: Any = NotSet,
arg3: Any = NotSet,
arg4: Any = NotSet,
arg5: Any = NotSet,
):
"""Execute command via application command server and wait for command to finish."""
run_command(
command_id,
arg1,
arg2,
arg3,
arg4,
arg5,
wait_for_finish=True,
)
def run_rpc_command_get(
command_id: str,
arg1: Any = NotSet,
arg2: Any = NotSet,
arg3: Any = NotSet,
arg4: Any = NotSet,
arg5: Any = NotSet,
) -> Any:
"""Execute command via application command server and return command output."""
return run_command(
command_id,
arg1,
arg2,
arg3,
arg4,
arg5,
return_command_output=True,
)
def command_server_directory() -> str:
"""Return the directory of the command server"""
def trigger_command_server_command_execution():
"""Issue keystroke to trigger command server to execute command that
was written to the file. For internal use only"""
actions.key("ctrl-shift-f17")
def emit_pre_phrase_signal() -> bool:
"""
If in an application supporting the command client, returns True
and touches a file to indicate that a phrase is beginning execution.
Otherwise does nothing and returns False.
"""
return False
def did_emit_pre_phrase_signal() -> bool:
"""Indicates whether the pre-phrase signal was emitted at the start of this phrase"""
# NB: This action is used by cursorless; please don't delete it :)
return did_emit_pre_phrase_signal
@mac_ctx.action_class("user")
class MacUserActions:
def trigger_command_server_command_execution():
actions.key("cmd-shift-f17")
@ctx.action_class("user")
class UserActions:
def emit_pre_phrase_signal():
get_signal_path("prePhrase").touch()
return True
class MissingCommunicationDir(Exception):
pass
def get_signal_path(name: str) -> Path:
"""
Get the path to a signal in the signal subdirectory.
Args:
name (str): The name of the signal
Returns:
Path: The signal path
"""
dir_name = actions.user.command_server_directory()
communication_dir_path = get_communication_dir_path(dir_name)
if not communication_dir_path.exists():
raise MissingCommunicationDir()
signal_dir = communication_dir_path / "signals"
signal_dir.mkdir(parents=True, exist_ok=True)
return signal_dir / name
def pre_phrase(_: Any):
try:
global did_emit_pre_phrase_signal
did_emit_pre_phrase_signal = actions.user.emit_pre_phrase_signal()
except MissingCommunicationDir:
pass
def post_phrase(_: Any):
global did_emit_pre_phrase_signal
did_emit_pre_phrase_signal = False
speech_system.register("pre:phrase", pre_phrase)
speech_system.register("post:phrase", post_phrase)
@@ -0,0 +1,20 @@
from talon import Module
mod = Module()
mod.tag(
"command_client", desc="For applications which implement file-based RPC with Talon"
)
@mod.action_class
class Actions:
def command_server_directory() -> str:
"""
The dirctory which contains the files required for communication between
the application and Talon. This is the only function which absolutely
must be implemented for any application using the command-client. Each
application that supports file-based RPC should use its own directory
name. Note that this action should only return a name; the parent
directory is determined by the core command client code.
"""
@@ -0,0 +1,21 @@
import os
from pathlib import Path
from tempfile import gettempdir
def get_communication_dir_path(name: str) -> Path:
"""Returns directory that is used by command-server for communication
Args:
name (str): The name of the communication dir
Returns:
Path: The path to the communication dir
"""
suffix = ""
# NB: We don't suffix on Windows, because the temp dir is user-specific
# anyways
if hasattr(os, "getuid"):
suffix = f"-{os.getuid()}"
return Path(gettempdir()) / f"{name}{suffix}"
@@ -0,0 +1,52 @@
import json
import time
from pathlib import Path
from typing import Any
from talon import actions
# The amount of time to wait for application to perform a command, in seconds
RPC_COMMAND_TIMEOUT_SECONDS = 3.0
# When doing exponential back off waiting for application to perform a command, how
# long to sleep the first time
MINIMUM_SLEEP_TIME_SECONDS = 0.0005
def read_json_with_timeout(path: Path) -> Any:
"""Repeatedly tries to read a json object from the given path, waiting
until there is a trailing new line indicating that the write is complete
Args:
path (str): The path to read from
Raises:
Exception: If we timeout waiting for a response
Returns:
Any: The json-decoded contents of the file
"""
timeout_time = time.perf_counter() + RPC_COMMAND_TIMEOUT_SECONDS
sleep_time = MINIMUM_SLEEP_TIME_SECONDS
while True:
try:
raw_text = path.read_text()
if raw_text.endswith("\n"):
break
except FileNotFoundError:
# If not found, keep waiting
pass
actions.sleep(sleep_time)
time_left = timeout_time - time.perf_counter()
if time_left < 0:
raise Exception("Timed out waiting for response")
# NB: We use minimum sleep time here to ensure that we don't spin with
# small sleeps due to clock slip
sleep_time = max(min(sleep_time * 2, time_left), MINIMUM_SLEEP_TIME_SECONDS)
return json.loads(raw_text)
@@ -0,0 +1,25 @@
from pathlib import Path
from uuid import uuid4
def robust_unlink(path: Path):
"""Unlink the given file if it exists, and if we're on windows and it is
currently in use, just rename it
Args:
path (Path): The path to unlink
"""
try:
path.unlink(missing_ok=True)
except OSError as e:
if hasattr(e, "winerror") and e.winerror == 32:
graveyard_dir = path.parent / "graveyard"
graveyard_dir.mkdir(parents=True, exist_ok=True)
graveyard_path = graveyard_dir / str(uuid4())
print(
f"WARNING: File {path} was in use when we tried to delete it; "
f"moving to graveyard at path {graveyard_path}"
)
path.rename(graveyard_path)
else:
raise e
@@ -0,0 +1,106 @@
import logging
from typing import Any, Callable
from uuid import uuid4
from talon import Module, actions
from .get_communication_dir_path import get_communication_dir_path
from .read_json_with_timeout import read_json_with_timeout
from .robust_unlink import robust_unlink
from .types import NoFileServerException, Request
from .write_request import write_request
logger = logging.getLogger(__name__)
mod = Module()
@mod.action_class
class Actions:
def rpc_client_run_command(
dir_name: str,
trigger_command_execution: Callable,
command_id: str,
args: list[Any],
wait_for_finish: bool = False,
return_command_output: bool = False,
):
"""Runs a command, using command server if available
Args:
dir_name (str): The name of the directory to use for communication.
trigger_command_execution (Callable): The function to call to trigger command execution.
command_id (str): The ID of the command to run.
args: The arguments to the command.
wait_for_finish (bool, optional): Whether to wait for the command to finish before returning. Defaults to False.
return_command_output (bool, optional): Whether to return the output of the command. Defaults to False.
Raises:
Exception: If there is an issue with the file-based communication, or
application raises an exception
Returns:
Object: The response from the command, if requested.
"""
communication_dir_path = get_communication_dir_path(dir_name)
if not communication_dir_path.exists():
logger.warning(
f"Communication directory not found at: {communication_dir_path}"
)
if args or return_command_output:
raise Exception(
"Communication directory not found. Must use command-server extension for advanced commands"
)
raise NoFileServerException("Communication directory not found")
request_path = communication_dir_path / "request.json"
response_path = communication_dir_path / "response.json"
# Generate uuid that will be mirrored back to us by command server for
# sanity checking
uuid = str(uuid4())
request = Request(
command_id=command_id,
args=args,
wait_for_finish=wait_for_finish,
return_command_output=return_command_output,
uuid=uuid,
)
# First, write the request to the request file, which makes us the sole
# owner because all other processes will try to open it with 'x'
write_request(request, request_path)
# We clear the response file if it does exist, though it shouldn't
if response_path.exists():
print("WARNING: Found old response file")
robust_unlink(response_path)
# Then, perform keystroke telling application to execute the command in the
# request file. Because only the active application instance will accept
# keypresses, we can be sure that the active application instance will be the
# one to execute the command.
trigger_command_execution()
try:
decoded_contents = read_json_with_timeout(response_path)
finally:
# NB: We remove response file first because we want to do this while we
# still own the request file
robust_unlink(response_path)
robust_unlink(request_path)
if decoded_contents["uuid"] != uuid:
raise Exception("uuids did not match")
for warning in decoded_contents["warnings"]:
print(f"WARNING: {warning}")
if decoded_contents["error"] is not None:
raise Exception(decoded_contents["error"])
actions.sleep("25ms")
return decoded_contents["returnValue"]
@@ -0,0 +1,24 @@
from dataclasses import dataclass
from typing import Any
@dataclass
class Request:
command_id: str
args: list[Any]
wait_for_finish: bool
return_command_output: bool
uuid: str
def to_dict(self):
return {
"commandId": self.command_id,
"args": self.args,
"waitForFinish": self.wait_for_finish,
"returnCommandOutput": self.return_command_output,
"uuid": self.uuid,
}
class NoFileServerException(Exception):
pass
@@ -0,0 +1,60 @@
import json
import time
from pathlib import Path
from typing import Any
from .robust_unlink import robust_unlink
from .types import Request
# How old a request file needs to be before we declare it stale and are willing
# to remove it
STALE_TIMEOUT_MS = 60_000
def write_request(request: Request, path: Path):
"""Converts the given request to json and writes it to the file, failing if
the file already exists unless it is stale in which case it replaces it
Args:
request (Request): The request to serialize
path (Path): The path to write to
Raises:
Exception: If another process has an active request file
"""
try:
write_json_exclusive(path, request.to_dict())
request_file_exists = False
except FileExistsError:
request_file_exists = True
if request_file_exists:
handle_existing_request_file(path)
write_json_exclusive(path, request.to_dict())
def write_json_exclusive(path: Path, body: Any):
"""Writes jsonified object to file, failing if the file already exists
Args:
path (Path): The path of the file to write
body (Any): The object to convert to json and write
"""
with path.open("x") as out_file:
out_file.write(json.dumps(body))
def handle_existing_request_file(path):
stats = path.stat()
modified_time_ms = stats.st_mtime_ns / 1e6
current_time_ms = time.time() * 1e3
time_difference_ms = abs(modified_time_ms - current_time_ms)
if time_difference_ms < STALE_TIMEOUT_MS:
raise Exception(
"Found recent request file; another Talon process is probably running"
)
print("Removing stale request file")
robust_unlink(path)