原文:https://greptime.com/blogs/2024-03-19-keyboard-monitoring
代码:https://github.com/GreptimeTeam/demo-scene/tree/main/keyboard-monitor
项目简介
该项目实现了打字频率统计及可视化功能。
主要使用的库
pynput
:允许您控制和监视输入设备。 这里我们用来获取键盘输入。
SQLAlchemy
:数据库操作。 这里我们用来保存键盘输入。
streamlit
:提供可视化界面。
项目组成
agent.py :获得键盘输入
display.py:可视化
补充说明
如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite
,
agent.py
# agent.py
from dotenv import load_dotenv
from pynput import keyboard
from pynput.keyboard import Keyimport concurrent.futures
import logging
import os
import queue
import sqlalchemy
import sqlalchemy.exc
import sys
import timeMODIFIERS = {Key.shift, Key.shift_l, Key.shift_r,Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr,Key.ctrl, Key.ctrl_l, Key.ctrl_r,Key.cmd, Key.cmd_l, Key.cmd_r,
}TABLE = sqlalchemy.Table('keyboard_monitor',sqlalchemy.MetaData(),sqlalchemy.Column('hits', sqlalchemy.String),sqlalchemy.Column('ts', sqlalchemy.DateTime),
)if __name__ == '__main__':load_dotenv()log = logging.getLogger("agent")log.setLevel(logging.DEBUG)formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s')file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8')file_handler.setLevel(logging.DEBUG)file_handler.setFormatter(formatter)stdout_handler = logging.StreamHandler(sys.stdout)stdout_handler.setLevel(logging.INFO)stdout_handler.setFormatter(formatter)log.addHandler(file_handler)log.addHandler(stdout_handler)#engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'], # echo_pool=True, # isolation_level='AUTOCOMMIT')engine = sqlalchemy.create_engine("sqlite:///keyboard.db")current_modifiers = set()pending_hits = queue.Queue()cancel_signal = queue.Queue()def on_press(key):if key in MODIFIERS:current_modifiers.add(key)else:hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ]hits = '+'.join(hits)pending_hits.put(hits)log.debug(f'{key} pressed, current_modifiers: {current_modifiers}')def on_release(key):if key in MODIFIERS:try:current_modifiers.remove(key)except KeyError:log.warning(f'Key {key} not in current_modifiers {current_modifiers}')log.debug(f'{key} released, current_modifiers: {current_modifiers}')#with engine.connect() as connection:# connection.execute(sqlalchemy.sql.text("""# CREATE TABLE IF NOT EXISTS keyboard_monitor (# hits STRING NULL,# ts TIMESTAMP(3) NOT NULL,# TIME INDEX ("ts")# ) ENGINE=mito WITH( regions = 1, ttl = '3months')# """))# ...from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Indexmetadata = MetaData()keyboard_monitor = Table('keyboard_monitor', metadata,Column('hits', String, nullable=True),Column('ts', TIMESTAMP, nullable=False),)metadata.create_all(engine)def sender_thread():retries = 0while True:hits = pending_hits.get()log.debug(f'got: {hits}')if hits is None:log.info("Exiting...")breakwith engine.connect() as connection:try:log.debug(f'sending: {hits}')connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now()))connection.commit()# ...log.info(f'sent: {hits}')retries = 0except sqlalchemy.exc.OperationalError as e:if retries >= 10:log.error(f'Retry exceeds. Operational error: {e}')pending_hits.put(hits)continueif e.connection_invalidated:log.warning(f'Connection invalidated: {e}')pending_hits.put(hits)continuemsg = str(e)if "(1815, 'Internal error: 1000')" in msg:# TODO 1815 - should not handle internal error;# see https://github.com/GreptimeTeam/greptimedb/issues/3447log.warning(f'Known operational error: {e}')pending_hits.put(hits)continueelif '2005' in msg and 'Unknown MySQL server host' in msg:log.warning(f'DNS temporary unresolved: {e}')pending_hits.put(hits)continueraise efinally:retries += 1def listener_thread():with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:log.info("Listening...")cancel_signal.get()pending_hits.put(None)log.info("Exiting...")with concurrent.futures.ThreadPoolExecutor() as executor:sender = executor.submit(sender_thread)listener = executor.submit(listener_thread)try:f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION)for fut in f.done:log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}')except KeyboardInterrupt as e:log.info("KeyboardInterrupt. Exiting...")except Exception as e:log.error(f'Unhandled exception: {e}')finally:cancel_signal.put(True)
display.py
# display.py
import datetime
import os
from dotenv import load_dotenv
import pytz
import streamlit as st
import tzlocal
import pandasst.title("Keyboard Monitor")load_dotenv()
#conn = st.connection(
## type="sql",
# url="sqlite:///keyboard.db",
#)conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db")df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor")
st.metric("Total hits", df.total_hits[0])most_frequent_key, most_frequent_combo = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_key.metric("Most frequent key", df.hits[0])
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_combo.metric("Most frequent combo", df.hits[0])top_frequent_keys, top_frequent_combos = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_keys.subheader("Top 10 keys")
top_frequent_keys.dataframe(df)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_combos.subheader("Top 10 combos")
top_frequent_combos.dataframe(df)st.header("Find your inputs frequency of day")
local_tz = tzlocal.get_localzone()
hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600)
if hours > 0:offset = f" + INTERVAL '{hours} hours'"
elif hours < 0:offset = f" - INTERVAL '{hours} hours'"
else:offset = ''
d = st.date_input("Pick a day:", value=datetime.date.today())
query = f"""
SELECT ts,COUNT(1) AS times
FROM keyboard_monitor
WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}'
GROUP BY strftime('%Y-%m-%d %H:00:00', ts)
ORDER BY ts ASC
LIMIT 10;
"""df = conn.query(query)
#print(df.keys())
df['ts'] = pandas.to_datetime(df['ts'])
df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz)
st.dataframe(df)