megolm_filter/megolm_filter.py
2025-01-24 17:00:07 -03:00

192 lines
6.4 KiB
Python
Executable file

#!/usr/bin/env python3
# megolm_filter.py: operate on megolm session data
# Copyright (C) 2019 Aleksa Sarai <cyphar@cyphar.com>
# Copyright (C) 2025 Lain Iwakura <lain@serialexperiments.club>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import base64
import getpass
import hashlib
import hmac
import json
import struct
import sys
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util import Counter
# This parsing is from the spec:
# <https://github.com/matrix-org/matrix-doc/blob/master/specification/modules/end_to_end_encryption.rst#key-exports>
#
# Given a passphrase, we have
# {K, K'} = PBKDF2(HMAC-SHA-256, passphrase, S, N, 512)
# where K is the first 256 bits and K' the last 256 bits.
#
# Size | Description
# -----+------------------------------------------
# 1 | Export format version, which must be 0x01.
# 16 | The salt S.
# 16 | The initialization vector IV.
# 4 | The number of rounds N, as a big-endian unsigned 32-bit integer.
# var | The encrypted JSON object.
# 32 | The HMAC-SHA-256 of all the above string concatenated together,
# | using K' as the key.
HEADER = b"-----BEGIN MEGOLM SESSION DATA-----"
FOOTER = b"-----END MEGOLM SESSION DATA-----"
# XXX: It kinda sucks you can't have 16-byte bigints with Python's struct...
CryptoParams = struct.Struct(">c16s16sL")
MAC_SIZE = 32
def bail(*args):
print("[!]", *args, file=sys.stderr)
sys.exit(1)
# A bytes-friendly version of textwrap.fill.
def bytes_wrap(b, width):
wrapped = []
while b:
wrapped.append(b[:width])
b = b[width:]
return b"\n".join(wrapped)
# Short-hand for the PBKDF2 and split we need for K and K'.
def stretch_keys(passphrase, S, N):
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode("utf-8")
keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512 // 8)
return (keys[:256 // 8], keys[256 // 8:])
def enc_session_data(passphrase, json_data):
# Figure out our parameters.
version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000
# Clear bit 63 of IV -- apparently this is required to work around a quirk
# of the Android AES-CTR's counter implementation.
IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63)
# Get our keys.
K, Kp = stretch_keys(passphrase, S, N)
# Encrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
plaintext = json_data
ciphertext = cipher.encrypt(plaintext)
# Prepend the crypto parameters.
params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N)
body = params + ciphertext
# Compute the MAC.
body += hmac.digest(Kp, body, "sha256")
# Base64 everything, wrap it at 128-chars, and add the header+footer.
session_data = bytes_wrap(base64.b64encode(body), 128)
return b"\n".join([HEADER, session_data, FOOTER])
def dec_session_data(passphrase, session_data):
# Get rid of any trailing newlines.
session_data = session_data.strip()
# Does it have the header and footer?
if not session_data.startswith(HEADER):
bail("session data invalid: missing header %r" % (HEADER,))
if not session_data.endswith(FOOTER):
bail("session data invalid: missing footer %r" % (FOOTER,))
# Get the body and base64-decode it.
body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)])
if len(body) < CryptoParams.size + MAC_SIZE:
bail("session data invalid: data packet too small")
# Get the parameters (we need S and N to check the MAC).
params = body[:CryptoParams.size]
version, S, IV, N = CryptoParams.unpack(params)
IV = int.from_bytes(IV, byteorder="big")
# Figure out the keys.
K, Kp = stretch_keys(passphrase, S, N)
# Check the MAC.
mac = body[-MAC_SIZE:]
our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256")
if not hmac.compare_digest(mac, our_mac):
bail("session data corrupted or bad passphrase: mac check failed")
# Okay, decrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
ciphertext = body[CryptoParams.size:-MAC_SIZE]
return cipher.decrypt(ciphertext)
def main():
parser = argparse.ArgumentParser(description="Operate on megolm session backups.")
parser.add_argument("file", nargs='?', help="megolm session data")
parser.add_argument("room_ids", nargs='*', help="Room id to filter (optional)")
parser.add_argument("-o", "--output", help="Output to file")
parser.add_argument("-p", "--plain", dest="mode", const="plain", action="store_const",
help="Returns the plain unencrypted content")
args = parser.parse_args()
if not args.mode:
args.mode = "encrypted"
if not args.file:
bail("You must specify a file")
with open(args.file, "rb") as f:
data = f.read()
# Wait until after reading input to get the passphrase so pipelines work
# properly. This results in slightly strange behaviour for interactive
# uses, but most people will be using this in a pipeline.
passphrase = getpass.getpass(f"Backup passphrase [mode={args.mode}]: ")
decrypted_data = dec_session_data(passphrase, data)
if args.room_ids:
json_data = json.loads(decrypted_data)
filtered_data = [ key for key in json_data if key['room_id'] in args.room_ids ]
if not filtered_data:
bail(f"No keys found for room {args.room_id}")
decrypted_data = str(filtered_data).encode()
output = decrypted_data
if args.mode == "encrypted":
output = enc_session_data(passphrase, output)
if args.output:
with open(args.output, "wb") as file:
file.write(output + b"\n")
file.flush()
else:
print(f"\x1b[1A{output.decode()}")
if __name__ == "__main__":
main()