LinVAM/keyboard/_nixcommon.py

175 lines
5.9 KiB
Python

# -*- coding: utf-8 -*-
import struct
import os
import atexit
from time import time as now
from threading import Thread
from glob import glob
try:
from queue import Queue
except ImportError:
from Queue import Queue
event_bin_format = 'llHHI'
# Taken from include/linux/input.h
# https://www.kernel.org/doc/Documentation/input/event-codes.txt
EV_SYN = 0x00
EV_KEY = 0x01
EV_REL = 0x02
EV_ABS = 0x03
EV_MSC = 0x04
def make_uinput():
if not os.path.exists('/dev/uinput'):
raise IOError('No uinput module found.')
import fcntl, struct
# Requires uinput driver, but it's usually available.
uinput = open("/dev/uinput", 'wb')
UI_SET_EVBIT = 0x40045564
fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
UI_SET_KEYBIT = 0x40045565
for i in range(256):
fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
BUS_USB = 0x03
uinput_user_dev = "80sHHHHi64i64i64i64i"
axis = [0] * 64 * 4
uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
uinput.flush() # Without this you may get Errno 22: Invalid argument.
UI_DEV_CREATE = 0x5501
fcntl.ioctl(uinput, UI_DEV_CREATE)
UI_DEV_DESTROY = 0x5502
#fcntl.ioctl(uinput, UI_DEV_DESTROY)
return uinput
class EventDevice(object):
def __init__(self, path):
self.path = path
self._input_file = None
self._output_file = None
@property
def input_file(self):
if self._input_file is None:
try:
self._input_file = open(self.path, 'rb')
except IOError as e:
if e.strerror == 'Permission denied':
print('Permission denied ({}). You must be sudo to access global events.'.format(self.path))
exit()
def try_close():
try:
self._input_file.close
except:
pass
atexit.register(try_close)
return self._input_file
@property
def output_file(self):
if self._output_file is None:
self._output_file = open(self.path, 'wb')
atexit.register(self._output_file.close)
return self._output_file
def read_event(self):
data = self.input_file.read(struct.calcsize(event_bin_format))
seconds, microseconds, type, code, value = struct.unpack(event_bin_format, data)
return seconds + microseconds / 1e6, type, code, value, self.path
def write_event(self, type, code, value):
integer, fraction = divmod(now(), 1)
seconds = int(integer)
microseconds = int(fraction * 1e6)
data_event = struct.pack(event_bin_format, seconds, microseconds, type, code, value)
# Send a sync event to ensure other programs update.
sync_event = struct.pack(event_bin_format, seconds, microseconds, EV_SYN, 0, 0)
self.output_file.write(data_event + sync_event)
self.output_file.flush()
class AggregatedEventDevice(object):
def __init__(self, devices, output=None):
self.event_queue = Queue()
self.devices = devices
self.output = output or self.devices[0]
def start_reading(device):
while True:
self.event_queue.put(device.read_event())
for device in self.devices:
thread = Thread(target=start_reading, args=[device])
thread.setDaemon(True)
thread.start()
def read_event(self):
return self.event_queue.get(block=True)
def write_event(self, type, code, value):
self.output.write_event(type, code, value)
import re
from collections import namedtuple
DeviceDescription = namedtuple('DeviceDescription', 'event_file is_mouse is_keyboard')
device_pattern = r"""N: Name="([^"]+?)".+?H: Handlers=([^\n]+)"""
def list_devices_from_proc(type_name):
try:
with open('/proc/bus/input/devices') as f:
description = f.read()
except FileNotFoundError:
return
devices = {}
for name, handlers in re.findall(device_pattern, description, re.DOTALL):
path = '/dev/input/event' + re.search(r'event(\d+)', handlers).group(1)
if type_name in handlers:
yield EventDevice(path)
def list_devices_from_by_id(name_suffix, by_id=True):
for path in glob('/dev/input/{}/*-event-{}'.format('by-id' if by_id else 'by-path', name_suffix)):
yield EventDevice(path)
def aggregate_devices(type_name):
# Some systems have multiple keyboards with different range of allowed keys
# on each one, like a notebook with a "keyboard" device exclusive for the
# power button. Instead of figuring out which keyboard allows which key to
# send events, we create a fake device and send all events through there.
try:
uinput = make_uinput()
fake_device = EventDevice('uinput Fake Device')
fake_device._input_file = uinput
fake_device._output_file = uinput
except IOError as e:
import warnings
warnings.warn('Failed to create a device file using `uinput` module. Sending of events may be limited or unavailable depending on plugged-in devices.', stacklevel=2)
fake_device = None
# We don't aggregate devices from different sources to avoid
# duplicates.
devices_from_proc = list(list_devices_from_proc(type_name))
if devices_from_proc:
return AggregatedEventDevice(devices_from_proc, output=fake_device)
# breaks on mouse for virtualbox
# was getting /dev/input/by-id/usb-VirtualBox_USB_Tablet-event-mouse
devices_from_by_id = list(list_devices_from_by_id(type_name)) or list(list_devices_from_by_id(type_name, by_id=False))
if devices_from_by_id:
return AggregatedEventDevice(devices_from_by_id, output=fake_device)
# If no keyboards were found we can only use the fake device to send keys.
assert fake_device
return fake_device
def ensure_root():
if os.geteuid() != 0:
raise ImportError('You must be root to use this library on linux.')