sanatize identation

This commit is contained in:
Lain Iwakura 2025-01-24 15:22:11 -03:00
parent b380eb61cf
commit 5baf898eaf
Signed by: lain
GPG key ID: 89686F4239E80508

View file

@ -15,17 +15,17 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
import hmac
import argparse
import base64
import struct
import getpass
import hashlib
import argparse
import hmac
import struct
import sys
from Crypto.Util import Counter
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util import Counter
# This parsing is from the spec:
# <https://github.com/matrix-org/matrix-doc/blob/master/specification/modules/end_to_end_encryption.rst#key-exports>
@ -51,120 +51,129 @@ FOOTER = b"-----END MEGOLM SESSION DATA-----"
CryptoParams = struct.Struct(">c16s16sL")
MAC_SIZE = 32
def bail(*args):
print("[!]", *args, file=sys.stderr)
sys.exit(1)
print("[!]", *args, file=sys.stderr)
sys.exit(1)
# A bytes-friendly version of textwrap.fill.
def bytes_wrap(b, width):
wrapped = []
while b:
wrapped.append(b[:width])
b = b[width:]
return b"\n".join(wrapped)
wrapped = []
while b:
wrapped.append(b[:width])
b = b[width:]
return b"\n".join(wrapped)
# Short-hand for the PBKDF2 and split we need for K and K'.
def stretch_keys(passphrase, S, N):
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode("utf-8")
keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512//8)
return (keys[:256//8], keys[256//8:])
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode("utf-8")
keys = hashlib.pbkdf2_hmac("sha512", passphrase, S, N, dklen=512 // 8)
return (keys[:256 // 8], keys[256 // 8:])
def enc_session_data(passphrase, json_data):
# Figure out our parameters.
version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000
# Figure out our parameters.
version, S, IV, N = b"\x01", get_random_bytes(16), get_random_bytes(16), 500000
# Clear bit 63 of IV -- apparently this is required to work around a quirk
# of the Android AES-CTR's counter implementation.
IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63)
# Clear bit 63 of IV -- apparently this is required to work around a quirk
# of the Android AES-CTR's counter implementation.
IV = int.from_bytes(IV, byteorder="big") & ~(1 << 63)
# Get our keys.
K, Kp = stretch_keys(passphrase, S, N)
# Get our keys.
K, Kp = stretch_keys(passphrase, S, N)
# Encrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
plaintext = json_data
ciphertext = cipher.encrypt(plaintext)
# Encrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
plaintext = json_data
ciphertext = cipher.encrypt(plaintext)
# Prepend the crypto parameters.
params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N)
body = params + ciphertext
# Prepend the crypto parameters.
params = CryptoParams.pack(version, S, IV.to_bytes(16, "big"), N)
body = params + ciphertext
# Compute the MAC.
body += hmac.digest(Kp, body, "sha256")
# Compute the MAC.
body += hmac.digest(Kp, body, "sha256")
# Base64 everything, wrap it at 128-chars, and add the header+footer.
session_data = bytes_wrap(base64.b64encode(body), 128)
return b"\n".join([HEADER, session_data, FOOTER])
# Base64 everything, wrap it at 128-chars, and add the header+footer.
session_data = bytes_wrap(base64.b64encode(body), 128)
return b"\n".join([HEADER, session_data, FOOTER])
def dec_session_data(passphrase, session_data):
# Get rid of any trailing newlines.
session_data = session_data.strip()
# Get rid of any trailing newlines.
session_data = session_data.strip()
# Does it have the header and footer?
if not session_data.startswith(HEADER):
bail("session data invalid: missing header %r" % (HEADER,))
if not session_data.endswith(FOOTER):
bail("session data invalid: missing footer %r" % (FOOTER,))
# Does it have the header and footer?
if not session_data.startswith(HEADER):
bail("session data invalid: missing header %r" % (HEADER,))
if not session_data.endswith(FOOTER):
bail("session data invalid: missing footer %r" % (FOOTER,))
# Get the body and base64-decode it.
body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)])
# Get the body and base64-decode it.
body = base64.b64decode(session_data[len(HEADER):-len(FOOTER)])
if len(body) < CryptoParams.size + MAC_SIZE:
bail("session data invalid: data packet too small")
if len(body) < CryptoParams.size + MAC_SIZE:
bail("session data invalid: data packet too small")
# Get the parameters (we need S and N to check the MAC).
params = body[:CryptoParams.size]
version, S, IV, N = CryptoParams.unpack(params)
IV = int.from_bytes(IV, byteorder="big")
# Get the parameters (we need S and N to check the MAC).
params = body[:CryptoParams.size]
version, S, IV, N = CryptoParams.unpack(params)
IV = int.from_bytes(IV, byteorder="big")
# Figure out the keys.
K, Kp = stretch_keys(passphrase, S, N)
# Figure out the keys.
K, Kp = stretch_keys(passphrase, S, N)
# Check the MAC.
mac = body[-MAC_SIZE:]
our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256")
if not hmac.compare_digest(mac, our_mac):
bail("session data corrupted or bad passphrase: mac check failed")
# Check the MAC.
mac = body[-MAC_SIZE:]
our_mac = hmac.digest(Kp, body[:-MAC_SIZE], "sha256")
if not hmac.compare_digest(mac, our_mac):
bail("session data corrupted or bad passphrase: mac check failed")
# Okay, decrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
ciphertext = body[CryptoParams.size:-MAC_SIZE]
return cipher.decrypt(ciphertext)
# Okay, decrypt the JSON.
ctr = Counter.new(128, initial_value=IV)
cipher = AES.new(K, AES.MODE_CTR, counter=ctr)
ciphertext = body[CryptoParams.size:-MAC_SIZE]
return cipher.decrypt(ciphertext)
def main(args):
parser = argparse.ArgumentParser(description="Operate on megolm session backups.")
parser.add_argument("file", nargs="?", default="-", help="Input text file (- for stdin).")
parser.add_argument("-o", "--output", default="-", required=False, help="Output text file (- for stdout).")
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--into", dest="mode", const="encrypt", action="store_const", help="Encrypt and represent file as a megolm session backup.")
mode_group.add_argument("--from", dest="mode", const="decrypt", action="store_const", help="Decrypt the given megolm session and output the contents.")
args = parser.parse_args(args)
parser = argparse.ArgumentParser(description="Operate on megolm session backups.")
parser.add_argument("file", nargs="?", default="-", help="Input text file (- for stdin).")
parser.add_argument("-o", "--output", default="-", required=False, help="Output text file (- for stdout).")
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--into", dest="mode", const="encrypt", action="store_const",
help="Encrypt and represent file as a megolm session backup.")
mode_group.add_argument("--from", dest="mode", const="decrypt", action="store_const",
help="Decrypt the given megolm session and output the contents.")
args = parser.parse_args(args)
if args.file == "-":
args.file = "/dev/stdin"
if args.output == "-":
args.output = "/dev/stdout"
if args.file == "-":
args.file = "/dev/stdin"
if args.output == "-":
args.output = "/dev/stdout"
action = {
"encrypt": enc_session_data,
"decrypt": dec_session_data,
}[args.mode]
action = {
"encrypt": enc_session_data,
"decrypt": dec_session_data,
}[args.mode]
with open(args.file, "rb") as f:
data = f.read()
with open(args.file, "rb") as f:
data = f.read()
# Wait until after reading input to get the passphrase so pipelines work
# properly. This results in slightly strange behaviour for interactive
# uses, but most people will be using this in a pipeline.
passphrase = getpass.getpass("Backup passphrase [mode=%s]: " % (args.mode,))
output = action(passphrase, data)
# Wait until after reading input to get the passphrase so pipelines work
# properly. This results in slightly strange behaviour for interactive
# uses, but most people will be using this in a pipeline.
passphrase = getpass.getpass("Backup passphrase [mode=%s]: " % (args.mode,))
output = action(passphrase, data)
with open(args.output, "wb") as f:
f.write(output + b"\n")
f.flush()
with open(args.output, "wb") as f:
f.write(output + b"\n")
f.flush()
if __name__ == "__main__":
main(sys.argv[1:])
main(sys.argv[1:])