192 lines
6.4 KiB
Python
Executable file
192 lines
6.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# megolm_filter.py: operate on megolm session data
|
|
# Copyright (C) 2019 Aleksa Sarai <cyphar@cyphar.com>
|
|
# Copyright (C) 2025 Lain Iwakura <lain@serialexperiments.club>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import base64
|
|
import getpass
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import struct
|
|
import sys
|
|
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Random import get_random_bytes
|
|
from Crypto.Util import Counter
|
|
|
|
# This parsing is from the spec:
|
|
# <https://github.com/matrix-org/matrix-doc/blob/master/specification/modules/end_to_end_encryption.rst#key-exports>
|
|
#
|
|
# Given a passphrase, we have
|
|
# {K, K'} = PBKDF2(HMAC-SHA-256, passphrase, S, N, 512)
|
|
# where K is the first 256 bits and K' the last 256 bits.
|
|
#
|
|
# Size | Description
|
|
# -----+------------------------------------------
|
|
# 1 | Export format version, which must be 0x01.
|
|
# 16 | The salt S.
|
|
# 16 | The initialization vector IV.
|
|
# 4 | The number of rounds N, as a big-endian unsigned 32-bit integer.
|
|
# var | The encrypted JSON object.
|
|
# 32 | The HMAC-SHA-256 of all the above string concatenated together,
|
|
# | using K' as the key.
|
|
|
|
HEADER = b"-----BEGIN MEGOLM SESSION DATA-----"
|
|
FOOTER = b"-----END MEGOLM SESSION DATA-----"
|
|
|
|
# XXX: It kinda sucks you can't have 16-byte bigints with Python's struct...
|
|
CryptoParams = struct.Struct(">c16s16sL")
|
|
MAC_SIZE = 32
|
|
|
|
|
|
def bail(*args):
|
|
print("[!]", *args, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# A bytes-friendly version of textwrap.fill.
|
|
def bytes_wrap(b, width):
|
|
wrapped = []
|
|
while b:
|
|
wrapped.append(b[:width])
|
|
b = b[width:]
|
|
return b"\n".join(wrapped)
|
|
|
|
|
|
# Short-hand for the PBKDF2 and split we need for K and K'.
|
|
def stretch_keys(passphrase, S, N):
|
|
if not isinstance(passphrase, bytes):
|
|
passphrase = passphrase.encode("utf-8")
|
|
keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512 // 8)
|
|
return (keys[:256 // 8], keys[256 // 8:])
|
|
|
|
|
|
def enc_session_data(passphrase, json_data):
|
|
# Figure out our parameters.
|
|
version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000
|
|
|
|
# Clear bit 63 of IV -- apparently this is required to work around a quirk
|
|
# of the Android AES-CTR's counter implementation.
|
|
IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63)
|
|
|
|
# Get our keys.
|
|
K, Kp = stretch_keys(passphrase, S, N)
|
|
|
|
# Encrypt the JSON.
|
|
ctr = Counter.new(128, initial_value=IV)
|
|
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
|
|
plaintext = json_data
|
|
ciphertext = cipher.encrypt(plaintext)
|
|
|
|
# Prepend the crypto parameters.
|
|
params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N)
|
|
body = params + ciphertext
|
|
|
|
# Compute the MAC.
|
|
body += hmac.digest(Kp, body, "sha256")
|
|
|
|
# Base64 everything, wrap it at 128-chars, and add the header+footer.
|
|
session_data = bytes_wrap(base64.b64encode(body), 128)
|
|
return b"\n".join([HEADER, session_data, FOOTER])
|
|
|
|
|
|
def dec_session_data(passphrase, session_data):
|
|
# Get rid of any trailing newlines.
|
|
session_data = session_data.strip()
|
|
|
|
# Does it have the header and footer?
|
|
if not session_data.startswith(HEADER):
|
|
bail("session data invalid: missing header %r" % (HEADER,))
|
|
if not session_data.endswith(FOOTER):
|
|
bail("session data invalid: missing footer %r" % (FOOTER,))
|
|
|
|
# Get the body and base64-decode it.
|
|
body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)])
|
|
|
|
if len(body) < CryptoParams.size + MAC_SIZE:
|
|
bail("session data invalid: data packet too small")
|
|
|
|
# Get the parameters (we need S and N to check the MAC).
|
|
params = body[:CryptoParams.size]
|
|
version, S, IV, N = CryptoParams.unpack(params)
|
|
IV = int.from_bytes(IV, byteorder="big")
|
|
|
|
# Figure out the keys.
|
|
K, Kp = stretch_keys(passphrase, S, N)
|
|
|
|
# Check the MAC.
|
|
mac = body[-MAC_SIZE:]
|
|
our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256")
|
|
if not hmac.compare_digest(mac, our_mac):
|
|
bail("session data corrupted or bad passphrase: mac check failed")
|
|
|
|
# Okay, decrypt the JSON.
|
|
ctr = Counter.new(128, initial_value=IV)
|
|
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
|
|
ciphertext = body[CryptoParams.size:-MAC_SIZE]
|
|
return cipher.decrypt(ciphertext)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Operate on megolm session backups.")
|
|
parser.add_argument("file", nargs='?', help="megolm session data")
|
|
parser.add_argument("room_ids", nargs='*', help="Room id to filter (optional)")
|
|
parser.add_argument("-o", "--output", help="Output to file")
|
|
parser.add_argument("-p", "--plain", dest="mode", const="plain", action="store_const",
|
|
help="Returns the plain unencrypted content")
|
|
args = parser.parse_args()
|
|
|
|
if not args.mode:
|
|
args.mode = "encrypted"
|
|
|
|
if not args.file:
|
|
bail("You must specify a file")
|
|
|
|
with open(args.file, "rb") as f:
|
|
data = f.read()
|
|
|
|
# Wait until after reading input to get the passphrase so pipelines work
|
|
# properly. This results in slightly strange behaviour for interactive
|
|
# uses, but most people will be using this in a pipeline.
|
|
passphrase = getpass.getpass(f"Backup passphrase [mode={args.mode}]: ")
|
|
decrypted_data = dec_session_data(passphrase, data)
|
|
|
|
if args.room_ids:
|
|
json_data = json.loads(decrypted_data)
|
|
filtered_data = [ key for key in json_data if key['room_id'] in args.room_ids ]
|
|
|
|
if not filtered_data:
|
|
bail(f"No keys found for room {args.room_id}")
|
|
|
|
decrypted_data = str(filtered_data).encode()
|
|
|
|
output = decrypted_data
|
|
|
|
if args.mode == "encrypted":
|
|
output = enc_session_data(passphrase, output)
|
|
|
|
if args.output:
|
|
with open(args.output, "wb") as file:
|
|
file.write(output + b"\n")
|
|
file.flush()
|
|
else:
|
|
print(f"\x1b[1A{output.decode()}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|