Files
2026-04-26 22:44:16 -04:00

430 lines
16 KiB
Python

import os
import time
import urllib.error
import urllib.parse
import urllib.request
from calibre.devices.errors import ControlError
from calibre.devices.interface import DevicePlugin
from calibre.devices.usbms.deviceconfig import DeviceConfig
from calibre.devices.usbms.books import Book, BookList
from calibre.ebooks.metadata.book.base import Metadata
from . import ws_client
from .config import CrossPointConfigWidget, PREFS
from .log import add_log
class CrossPointDevice(DeviceConfig, DevicePlugin):
name = 'CrossPoint Reader'
gui_name = 'CrossPoint Reader'
description = 'CrossPoint Reader wireless device'
supported_platforms = ['windows', 'osx', 'linux']
author = 'CrossPoint Reader'
version = (0, 1, 4)
# Invalid USB vendor info to avoid USB scans matching.
VENDOR_ID = [0xFFFF]
PRODUCT_ID = [0xFFFF]
BCD = [0xFFFF]
FORMATS = ['epub']
ALL_FORMATS = ['epub']
SUPPORTS_SUB_DIRS = True
MUST_READ_METADATA = False
MANAGES_DEVICE_PRESENCE = True
DEVICE_PLUGBOARD_NAME = 'CROSSPOINT_READER'
MUST_READ_METADATA = False
SUPPORTS_DEVICE_DB = False
# Disable Calibre's device cache so we always refresh from device.
device_is_usb_mass_storage = False
def __init__(self, path):
super().__init__(path)
self.is_connected = False
self.device_host = None
self.device_port = None
self.last_discovery = 0.0
self.report_progress = lambda x, y: x
self._debug_enabled = False
def _log(self, message):
add_log(message)
if self._debug_enabled:
try:
self.report_progress(0.0, message)
except Exception:
pass
# Device discovery / presence
def _discover(self):
now = time.time()
if now - self.last_discovery < 2.0:
return None, None
self.last_discovery = now
host, port = ws_client.discover_device(
timeout=1.0,
debug=PREFS['debug'],
logger=self._log,
extra_hosts=[PREFS['host']],
)
if host and port:
return host, port
return None, None
def detect_managed_devices(self, devices_on_system, force_refresh=False):
if self.is_connected:
return self
debug = PREFS['debug']
self._debug_enabled = debug
if debug:
self._log('[CrossPoint] detect_managed_devices')
host, port = self._discover()
if host:
if debug:
self._log(f'[CrossPoint] discovered {host} {port}')
self.device_host = host
self.device_port = port
self.is_connected = True
return self
if debug:
self._log('[CrossPoint] discovery failed')
return None
def open(self, connected_device, library_uuid):
if not self.is_connected:
raise ControlError(desc='Attempt to open a closed device')
return True
def get_device_information(self, end_session=True):
host = self.device_host or PREFS['host']
device_info = {
'device_store_uuid': 'crosspoint-' + host.replace('.', '-'),
'device_name': 'CrossPoint Reader',
'device_version': '1',
}
return (self.gui_name, '1', '1', '', {'main': device_info})
def reset(self, key='-1', log_packets=False, report_progress=None, detected_device=None):
self.set_progress_reporter(report_progress)
def set_progress_reporter(self, report_progress):
if report_progress is None:
self.report_progress = lambda x, y: x
else:
self.report_progress = report_progress
def _http_base(self):
host = self.device_host or PREFS['host']
return f'http://{host}'
def _http_get_json(self, path, params=None, timeout=5):
url = self._http_base() + path
if params:
url += '?' + urllib.parse.urlencode(params)
try:
with urllib.request.urlopen(url, timeout=timeout) as resp:
data = resp.read().decode('utf-8', 'ignore')
except Exception as exc:
raise ControlError(desc=f'HTTP request failed: {exc}')
try:
import json
return json.loads(data)
except Exception as exc:
raise ControlError(desc=f'Invalid JSON response: {exc}')
def _http_post_form(self, path, data, timeout=5):
url = self._http_base() + path
body = urllib.parse.urlencode(data).encode('utf-8')
req = urllib.request.Request(url, data=body, method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded'})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status, resp.read().decode('utf-8', 'ignore')
except Exception as exc:
raise ControlError(desc=f'HTTP request failed: {exc}')
def config_widget(self):
return CrossPointConfigWidget()
def save_settings(self, config_widget):
config_widget.save()
def _list_files_recursive(self, path='/'):
"""Return a flat list of (lpath, size) for all EPUB files on device."""
results = []
try:
entries = self._http_get_json('/api/files', params={'path': path})
except Exception as exc:
self._log(f'[CrossPoint] listing {path} failed: {exc}')
return results
for entry in entries:
name = entry.get('name', '')
if not name:
continue
if path == '/':
entry_path = '/' + name
else:
entry_path = path + '/' + name
if entry.get('isDirectory'):
results.extend(self._list_files_recursive(entry_path))
elif entry.get('isEpub'):
results.append((entry_path, entry.get('size', 0)))
return results
def books(self, oncard=None, end_session=True):
if oncard is not None:
return BookList(None, None, None)
file_list = self._list_files_recursive('/')
bl = BookList(None, None, None)
fetch_metadata = PREFS['fetch_metadata']
for lpath, size in file_list:
title = os.path.splitext(os.path.basename(lpath))[0]
meta = Metadata(title, [])
if fetch_metadata:
try:
from calibre.customize.ui import quick_metadata
from calibre.ebooks.metadata.meta import get_metadata
with self._download_temp(lpath) as tf:
with quick_metadata:
m = get_metadata(tf, stream_type='epub', force_read_metadata=True)
if m is not None:
meta = m
except Exception as exc:
self._log(f'[CrossPoint] metadata read failed for {lpath}: {exc}')
book = Book('', lpath, size=size, other=meta)
bl.add_book(book, replace_metadata=True)
return bl
def sync_booklists(self, booklists, end_session=True):
# No on-device metadata sync supported.
return None
def card_prefix(self, end_session=True):
return None, None
def total_space(self, end_session=True):
return 10 * 1024 * 1024 * 1024, 0, 0
def free_space(self, end_session=True):
return 10 * 1024 * 1024 * 1024, 0, 0
def _format_upload_path(self, mi, original_name):
"""Format an upload path using the send-to-device template.
Returns (subdirs, filename) where subdirs is a list of directory
components from the template (may be empty for flat templates).
"""
try:
from calibre.library.save_to_disk import config as sconfig, get_components
from calibre.utils.filenames import ascii_filename
template = self.save_template()
if not template:
template = sconfig().parse().send_template
components = get_components(
template, mi, -1, '%b %Y', 250,
ascii_filename, to_lowercase=False,
replace_whitespace=False, safe_format=True,
last_has_extension=False,
)
components = [c.strip() for c in components if c and c.strip()]
if not components:
return [], original_name
ext = os.path.splitext(original_name)[1]
filename = components[-1] + ext
subdirs = components[:-1]
return subdirs, filename
except Exception as exc:
self._log(f'[CrossPoint] template format failed: {exc}')
return [], original_name
def _mkdir_on_device(self, name, path):
"""Create a directory on device via POST /mkdir.
Silently ignores 400 errors (folder already exists).
Uses urllib directly to avoid _http_post_form which wraps all
errors as ControlError.
"""
url = self._http_base() + '/mkdir'
body = urllib.parse.urlencode({'name': name, 'path': path}).encode('utf-8')
req = urllib.request.Request(url, data=body, method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded'})
try:
with urllib.request.urlopen(req, timeout=5) as resp:
resp.read()
except urllib.error.HTTPError as exc:
if exc.code == 400:
self._log(f'[CrossPoint] mkdir ignored (already exists): {name} in {path}')
else:
raise ControlError(desc=f'mkdir failed for {name} in {path}: {exc}')
except Exception as exc:
raise ControlError(desc=f'mkdir failed for {name} in {path}: {exc}')
def _ensure_dir(self, parent_path, subdirs):
"""Ensure subdirectories exist under parent_path on device.
Creates the full nested path with a single mkdir call (device
uses recursive mkdir). Returns the full directory path.
"""
subdir_path = '/'.join(subdirs)
self._mkdir_on_device(subdir_path, parent_path)
if parent_path == '/':
return '/' + subdir_path
return parent_path + '/' + subdir_path
def upload_books(self, files, names, on_card=None, end_session=True, metadata=None):
host = self.device_host or PREFS['host']
port = self.device_port or PREFS['port']
upload_path = PREFS['path']
chunk_size = PREFS['chunk_size']
if chunk_size > 2048:
self._log(f'[CrossPoint] chunk_size capped to 2048 (was {chunk_size})')
chunk_size = 2048
debug = PREFS['debug']
# Normalize base upload path
base_path = upload_path
if not base_path.startswith('/'):
base_path = '/' + base_path
if base_path != '/' and base_path.endswith('/'):
base_path = base_path[:-1]
paths = []
total = len(files)
for i, (infile, name) in enumerate(zip(files, names)):
if hasattr(infile, 'read'):
filepath = getattr(infile, 'name', None)
if not filepath:
raise ControlError(desc='In-memory uploads are not supported')
else:
filepath = infile
filename = os.path.basename(name)
subdirs = []
if metadata and i < len(metadata) and not PREFS['send_to_root']:
subdirs, filename = self._format_upload_path(metadata[i], filename)
if subdirs:
target_dir = self._ensure_dir(base_path, subdirs)
else:
target_dir = base_path
if target_dir == '/':
lpath = '/' + filename
else:
lpath = target_dir + '/' + filename
def _progress(sent, size):
if size > 0:
self.report_progress((i + sent / float(size)) / float(total),
'Transferring books to device...')
ws_client.upload_file(
host,
port,
target_dir,
filename,
filepath,
chunk_size=chunk_size,
debug=debug,
progress_cb=_progress,
logger=self._log,
)
paths.append((lpath, os.path.getsize(filepath)))
self.report_progress(1.0, 'Transferring books to device...')
return paths
def add_books_to_metadata(self, locations, metadata, booklists):
self._log(f'[CrossPoint] add_books_to_metadata: {len(locations)} locations, '
f'{len(booklists)} booklists')
metadata = iter(metadata)
for location in locations:
info = next(metadata)
lpath = location[0]
length = location[1]
book = Book('', lpath, size=length, other=info)
if booklists:
booklists[0].add_book(book, replace_metadata=True)
self._log(f'[CrossPoint] added to booklist: {lpath}')
else:
self._log(f'[CrossPoint] WARNING: booklists empty, could not add {lpath}')
def delete_books(self, paths, end_session=True):
import json as _json
self._log(f'[CrossPoint] deleting {len(paths)} books: {paths}')
url = self._http_base() + '/delete'
# Server expects form field 'paths' containing a JSON array string
body = urllib.parse.urlencode({'paths': _json.dumps(list(paths))}).encode('utf-8')
req = urllib.request.Request(url, data=body, method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded'})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
self._log(f'[CrossPoint] delete OK: {resp.status}')
except urllib.error.HTTPError as exc:
err_body = exc.read().decode('utf-8', 'ignore') if exc.fp else ''
self._log(f'[CrossPoint] delete error {exc.code}: {err_body}')
raise ControlError(desc=f'Delete failed: {exc.code} {err_body}')
except Exception as exc:
raise ControlError(desc=f'Delete failed: {exc}')
def remove_books_from_metadata(self, paths, booklists):
def norm(p):
if not p:
return ''
p = p.replace('\\', '/')
if not p.startswith('/'):
p = '/' + p
return p
deleted = set(norm(p) for p in paths)
self._log(f'[CrossPoint] deleted paths: {sorted(deleted)}')
removed = 0
for bl in booklists:
for book in tuple(bl):
bpath = norm(getattr(book, 'path', ''))
blpath = norm(getattr(book, 'lpath', ''))
if bpath in deleted or blpath in deleted:
bl.remove_book(book)
removed += 1
self._log(f'[CrossPoint] removed {removed} items from device list')
def get_file(self, path, outfile, end_session=True, this_book=None, total_books=None):
url = self._http_base() + '/download'
params = urllib.parse.urlencode({'path': path})
try:
with urllib.request.urlopen(url + '?' + params, timeout=10) as resp:
while True:
chunk = resp.read(65536)
if not chunk:
break
outfile.write(chunk)
except Exception as exc:
raise ControlError(desc=f'Failed to download {path}: {exc}')
def _download_temp(self, path):
from calibre.ptempfile import PersistentTemporaryFile
tf = PersistentTemporaryFile(suffix='.epub')
self.get_file(path, tf)
tf.flush()
tf.seek(0)
return tf
def eject(self):
self.is_connected = False
def is_dynamically_controllable(self):
return 'crosspoint'
def start_plugin(self):
return None
def stop_plugin(self):
self.is_connected = False