#!/usr/bin/env python3 # megolm_filter.py: operate on megolm session data # Copyright (C) 2019 Aleksa Sarai # Copyright (C) 2025 Lain Iwakura # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import base64 import getpass import hashlib import hmac import json import struct import sys from Crypto.Cipher import AES from Crypto.Random import get_random_bytes from Crypto.Util import Counter # This parsing is from the spec: # # # Given a passphrase, we have # {K, K'} = PBKDF2(HMAC-SHA-256, passphrase, S, N, 512) # where K is the first 256 bits and K' the last 256 bits. # # Size | Description # -----+------------------------------------------ # 1 | Export format version, which must be 0x01. # 16 | The salt S. # 16 | The initialization vector IV. # 4 | The number of rounds N, as a big-endian unsigned 32-bit integer. # var | The encrypted JSON object. # 32 | The HMAC-SHA-256 of all the above string concatenated together, # | using K' as the key. HEADER = b"-----BEGIN MEGOLM SESSION DATA-----" FOOTER = b"-----END MEGOLM SESSION DATA-----" # XXX: It kinda sucks you can't have 16-byte bigints with Python's struct... CryptoParams = struct.Struct(">c16s16sL") MAC_SIZE = 32 def bail(*args): print("[!]", *args, file=sys.stderr) sys.exit(1) # A bytes-friendly version of textwrap.fill. def bytes_wrap(b, width): wrapped = [] while b: wrapped.append(b[:width]) b = b[width:] return b"\n".join(wrapped) # Short-hand for the PBKDF2 and split we need for K and K'. def stretch_keys(passphrase, S, N): if not isinstance(passphrase, bytes): passphrase = passphrase.encode("utf-8") keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512 // 8) return (keys[:256 // 8], keys[256 // 8:]) def enc_session_data(passphrase, json_data): # Figure out our parameters. version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000 # Clear bit 63 of IV -- apparently this is required to work around a quirk # of the Android AES-CTR's counter implementation. IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63) # Get our keys. K, Kp = stretch_keys(passphrase, S, N) # Encrypt the JSON. ctr = Counter.new(128, initial_value=IV) cipher = AES.new(K, AES.MODE_CTR, counter=ctr) plaintext = json_data ciphertext = cipher.encrypt(plaintext) # Prepend the crypto parameters. params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N) body = params + ciphertext # Compute the MAC. body += hmac.digest(Kp, body, "sha256") # Base64 everything, wrap it at 128-chars, and add the header+footer. session_data = bytes_wrap(base64.b64encode(body), 128) return b"\n".join([HEADER, session_data, FOOTER]) def dec_session_data(passphrase, session_data): # Get rid of any trailing newlines. session_data = session_data.strip() # Does it have the header and footer? if not session_data.startswith(HEADER): bail("session data invalid: missing header %r" % (HEADER,)) if not session_data.endswith(FOOTER): bail("session data invalid: missing footer %r" % (FOOTER,)) # Get the body and base64-decode it. body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)]) if len(body) < CryptoParams.size + MAC_SIZE: bail("session data invalid: data packet too small") # Get the parameters (we need S and N to check the MAC). params = body[:CryptoParams.size] version, S, IV, N = CryptoParams.unpack(params) IV = int.from_bytes(IV, byteorder="big") # Figure out the keys. K, Kp = stretch_keys(passphrase, S, N) # Check the MAC. mac = body[-MAC_SIZE:] our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256") if not hmac.compare_digest(mac, our_mac): bail("session data corrupted or bad passphrase: mac check failed") # Okay, decrypt the JSON. ctr = Counter.new(128, initial_value=IV) cipher = AES.new(K, AES.MODE_CTR, counter=ctr) ciphertext = body[CryptoParams.size:-MAC_SIZE] return cipher.decrypt(ciphertext) def main(): parser = argparse.ArgumentParser(description="Operate on megolm session backups.") parser.add_argument("file", nargs='?', help="megolm session data") parser.add_argument("room_ids", nargs='*', help="Room id to filter (optional)") parser.add_argument("-o", "--output", help="Output to file") parser.add_argument("-p", "--plain", dest="mode", const="plain", action="store_const", help="Returns the plain unencrypted content") args = parser.parse_args() if not args.mode: args.mode = "encrypted" if not args.file: bail("You must specify a file") with open(args.file, "rb") as f: data = f.read() # Wait until after reading input to get the passphrase so pipelines work # properly. This results in slightly strange behaviour for interactive # uses, but most people will be using this in a pipeline. passphrase = getpass.getpass(f"Backup passphrase [mode={args.mode}]: ") decrypted_data = dec_session_data(passphrase, data) if args.room_ids: json_data = json.loads(decrypted_data) filtered_data = [ key for key in json_data if key['room_id'] in args.room_ids ] if not filtered_data: bail(f"No keys found for room {args.room_id}") decrypted_data = str(filtered_data).encode() output = decrypted_data if args.mode == "encrypted": output = enc_session_data(passphrase, output) if args.output: with open(args.output, "wb") as file: file.write(output + b"\n") file.flush() else: print(f"\x1b[1A{output.decode()}") if __name__ == "__main__": main()