External Publication
Visit Post

Can any run see if this Pi-based Reconstruction and Archival Protocol Works?

Hugging Face Forums [Unofficial] April 7, 2026
Source

#!/usr/bin/env python3 “”" Pi-RAP: Pi-based Reconstruction & Archival Protocol Complete File System with Error Correction, Encryption, Compression Inspired by “No Way Out” (1987) reconstruction scene

Features:

  • Pi-based sector distribution
  • Reed-Solomon error correction
  • Pi stream cipher encryption
  • Zlib compression
  • Network distribution simulation
  • Checksum verification
  • Progress tracking “”"

import os import sys import zlib import hashlib import random import struct import json import time from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional import math

Try to import reedsolo for advanced error correction

try: import reedsolo REED_SOLOMON_AVAILABLE = True except ImportError: REED_SOLOMON_AVAILABLE = False print(“Note: Install ‘reedsolo’ for advanced error correction: pip install reedsolo”)

class PiDigitGenerator: “”“Generate Pi digits using spigot algorithm”“”

def __init__(self):
    self.digits = self._calculate_pi_digits(10000)

def _calculate_pi_digits(self, n_digits: int) -> str:
    """Calculate Pi digits using spigot algorithm"""
    if n_digits < 100:
        # Use pre-calculated for small requests
        pi_str = "14159265358979323846264338327950288419716939937510"
        pi_str += "58209749445923078164062862089986280348253421170679"
        return pi_str[:n_digits]

    # Spigot algorithm for more digits
    pi_digits = []
    arr = [2] * (n_digits * 10 // 3)
    carry = 0

    for i in range(n_digits):
        sum_val = 0
        for j in range(len(arr) - 1, -1, -1):
            num = arr[j] * 10 + carry
            if j == 0:
                pi_digits.append(str(num // (2 * j + 1)))
                carry = num % (2 * j + 1)
            else:
                arr[j] = num % (2 * j + 1)
                sum_val = num // (2 * j + 1)
                carry = sum_val * (j)

        if len(pi_digits) >= n_digits:
            break

    result = ''.join(pi_digits[:n_digits])
    return result if result else "14159265358979323846264338327950288419716939937510"

def get_digit(self, position: int) -> int:
    """Get Pi digit at specific position (0-indexed)"""
    if position < len(self.digits):
        return int(self.digits[position])
    return int(self.digits[position % len(self.digits)])

def get_digits(self, start: int, length: int) -> str:
    """Get a sequence of Pi digits"""
    result = ""
    for i in range(length):
        result += str(self.get_digit(start + i))
    return result

class PiStreamCipher: “”“Encryption using Pi digits as stream cipher key”“”

def __init__(self, pi_gen: PiDigitGenerator, seed: int = 0):
    self.pi_gen = pi_gen
    self.seed = seed

def _generate_keystream(self, length: int) -> bytes:
    """Generate keystream from Pi digits"""
    keystream = bytearray()
    position = self.seed

    # Generate enough bytes
    while len(keystream) < length:
        # Get multiple Pi digits and combine them
        digits = self.pi_gen.get_digits(position, 4)
        byte_val = int(digits) % 256
        keystream.append(byte_val)
        position += 4

    return bytes(keystream[:length])

def encrypt(self, data: bytes) -> bytes:
    """Encrypt data using XOR with Pi keystream"""
    keystream = self._generate_keystream(len(data))
    encrypted = bytes(a ^ b for a, b in zip(data, keystream))
    return encrypted

def decrypt(self, data: bytes) -> bytes:
    """Decrypt data (same as encrypt for XOR cipher)"""
    return self.encrypt(data)

class ReedSolomonCodec: “”“Error correction using Reed-Solomon codes”“”

def __init__(self, nsym: int = 10):
    """
    nsym: number of error correction symbols
    More symbols = more error correction but larger overhead
    """
    self.nsym = nsym
    self.rs = None

    if REED_SOLOMON_AVAILABLE:
        try:
            self.rs = reedsolo.RSCodec(nsym)
        except:
            self.rs = None

def encode(self, data: bytes) -> bytes:
    """Add error correction codes"""
    if self.rs:
        return self.rs.encode(data)
    else:
        # Simple parity fallback
        parity = bytes([sum(data) % 256])
        return data + parity * self.nsym

def decode(self, data: bytes) -> bytes:
    """Remove error correction and correct errors"""
    if self.rs:
        try:
            return self.rs.decode(data)[0]
        except:
            return data[:len(data) - self.nsym]
    else:
        # Simple parity fallback
        return data[:-self.nsym] if self.nsym > 0 else data

class NetworkDistributor: “”“Simulate network distribution across multiple nodes”“”

def __init__(self, num_nodes: int = 5):
    self.num_nodes = num_nodes
    self.nodes = {f"node_{i}": {} for i in range(num_nodes)}

def distribute(self, sectors: Dict[str, dict]) -> Dict[str, Dict]:
    """Distribute sectors across network nodes"""
    distribution = {}

    for sector_id, sector_data in sectors.items():
        # Use Pi digit to determine primary and backup nodes
        sector_num = int(sector_id.split('_')[-1].replace('.dat', ''))
        pi_digit = sector_num % 10

        primary_node = f"node_{pi_digit % self.num_nodes}"
        backup_node = f"node_{(pi_digit + 1) % self.num_nodes}"

        # Store on primary node
        self.nodes[primary_node][sector_id] = {
            'data': sector_data['data'],
            'type': 'primary'
        }

        # Store backup on secondary node
        self.nodes[backup_node][sector_id] = {
            'data': sector_data['data'],
            'type': 'backup'
        }

        distribution[sector_id] = {
            'primary': primary_node,
            'backup': backup_node
        }

    return distribution

def simulate_node_failure(self, failed_node: str) -> int:
    """Simulate a node going offline"""
    if failed_node in self.nodes:
        lost_sectors = len(self.nodes[failed_node])
        self.nodes[failed_node] = {}
        return lost_sectors
    return 0

def recover_from_backup(self, lost_sectors: List[str]) -> Dict[str, dict]:
    """Recover lost sectors from backup nodes"""
    recovered = {}

    for sector_id in lost_sectors:
        for node_name, node_data in self.nodes.items():
            if sector_id in node_data:
                if node_data[sector_id]['type'] == 'backup':
                    # Promote backup to primary
                    node_data[sector_id]['type'] = 'primary'
                    recovered[sector_id] = node_data[sector_id]['data']
                    break

    return recovered

def get_all_sectors(self) -> Dict[str, dict]:
    """Get all sectors from all nodes"""
    all_sectors = {}
    for node_name, node_data in self.nodes.items():
        for sector_id, sector_info in node_data.items():
            if sector_info['type'] == 'primary':
                all_sectors[sector_id] = {
                    'data': sector_info['data'],
                    'node': node_name
                }
    return all_sectors

class PiRAPFileSystem: “”“Main Pi-RAP File System”“”

def __init__(self, config: dict = None):
    self.config = config or {
        'sector_size': 50000,  # bits
        'compression': True,
        'encryption': True,
        'error_correction': True,
        'encryption_seed': 0,
        'network_nodes': 5,
        'redundancy_level': 2,
        'verbose': True
    }

    # Initialize components
    self.pi_gen = PiDigitGenerator()
    self.cipher = PiStreamCipher(self.pi_gen, self.config.get('encryption_seed', 0))
    self.rs_codec = ReedSolomonCodec(nsym=10 if self.config.get('error_correction') else 0)
    self.network = NetworkDistributor(self.config.get('network_nodes', 5))

    self.byte_sector_size = self.config.get('sector_size', 50000) // 8
    self.storage_map = {}
    self.file_metadata = {}
    self.distribution_map = {}

    # Statistics
    self.stats = {
        'original_size': 0,
        'compressed_size': 0,
        'encrypted_size': 0,
        'total_sectors': 0,
        'compression_ratio': 0,
        'processing_time': 0
    }

def log(self, message: str, level: str = "INFO"):
    """Log message with timestamp"""
    if self.config.get('verbose', True):
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] [{level}] {message}")

def progress_bar(self, current: int, total: int, prefix: str = "", length: int = 40):
    """Display progress bar"""
    if not self.config.get('verbose', True):
        return

    percent = 100 * (current / total) if total > 0 else 0
    filled = int(length * current // total)
    bar = '█' * filled + '░' * (length - filled)
    sys.stdout.write(f'\r{prefix} |{bar}| {percent:.1f}% ({current}/{total})')
    sys.stdout.flush()
    if current >= total:
        print()

def calculate_checksum(self, data: bytes) -> str:
    """Calculate SHA-256 checksum"""
    return hashlib.sha256(data).hexdigest()

def compress_data(self, data: bytes) -> bytes:
    """Compress data using zlib"""
    if not self.config.get('compression', True):
        return data

    compressed = zlib.compress(data, level=9)
    compression_ratio = len(compressed) / len(data) if len(data) > 0 else 1
    self.stats['compression_ratio'] = compression_ratio
    return compressed

def decompress_data(self, data: bytes) -> bytes:
    """Decompress data"""
    if not self.config.get('compression', True):
        return data

    try:
        return zlib.decompress(data)
    except:
        return data

def read_file(self, filepath: str) -> Tuple[bytes, str]:
    """Read file and calculate checksum"""
    self.log(f"Reading file: {filepath}")

    with open(filepath, 'rb') as f:
        data = f.read()

    checksum = self.calculate_checksum(data)
    file_size = len(data)

    self.stats['original_size'] = file_size

    self.log(f"File size: {file_size:,} bytes ({file_size*8:,} bits)")
    self.log(f"SHA-256: {checksum[:32]}...")

    return data, checksum

def create_sectors(self, data: bytes, filename: str) -> List[bytes]:
    """Break data into sectors"""
    self.log(f"Creating sectors ({self.byte_sector_size:,} bytes each)...")

    # Pad data if needed
    original_len = len(data)
    if len(data) % self.byte_sector_size != 0:
        padding_len = self.byte_sector_size - (len(data) % self.byte_sector_size)
        data = data + b'\x00' * padding_len

    # Split into sectors
    sectors = []
    total_sectors = len(data) // self.byte_sector_size

    for i in range(total_sectors):
        start = i * self.byte_sector_size
        end = start + self.byte_sector_size
        sector_data = data[start:end]
        sectors.append(sector_data)

        if self.config.get('verbose', True):
            self.progress_bar(i + 1, total_sectors, "Sectorizing")

    self.stats['total_sectors'] = len(sectors)

    # Store metadata
    self.file_metadata = {
        'filename': filename,
        'original_size': original_len,
        'padded_size': len(data),
        'num_sectors': len(sectors),
        'sector_size': self.byte_sector_size,
        'timestamp': datetime.now().isoformat(),
        'config': self.config
    }

    self.log(f"✓ Created {len(sectors)} sectors")

    return sectors

def encrypt_and_encode_sectors(self, sectors: List[bytes]) -> Dict[str, dict]:
    """Encrypt sectors and add error correction"""
    self.log("Encrypting and adding error correction...")

    encrypted_sectors = {}
    total = len(sectors)

    for idx, sector in enumerate(sectors):
        # Encrypt
        if self.config.get('encryption', True):
            encrypted = self.cipher.encrypt(sector)
        else:
            encrypted = sector

        # Add error correction
        if self.config.get('error_correction', True):
            encoded = self.rs_codec.encode(encrypted)
        else:
            encoded = encrypted

        # Determine storage location using Pi
        pi_digit = self.pi_gen.get_digit(idx)
        disk_id = f"Disk_{pi_digit}"
        sector_address = f"{disk_id}/sector_{idx:06d}.dat"

        encrypted_sectors[sector_address] = {
            'data': encoded,
            'original_index': idx,
            'pi_digit': pi_digit,
            'disk_id': disk_id,
            'size': len(encoded)
        }

        if self.config.get('verbose', True):
            self.progress_bar(idx + 1, total, "Encrypting")

    self.log(f"✓ Encoded {len(encrypted_sectors)} sectors")

    return encrypted_sectors

def scatter_sectors(self, sectors: Dict[str, dict]):
    """Distribute sectors across network"""
    self.log(f"Distributing across {self.config.get('network_nodes', 5)} nodes...")

    # Network distribution
    self.distribution_map = self.network.distribute(sectors)

    # Also store locally
    self.storage_map = sectors

    self.log(f"✓ Distributed to {len(self.distribution_map)} locations")

    # Show distribution stats
    node_counts = {}
    for sector_id, dist_info in self.distribution_map.items():
        primary = dist_info['primary']
        node_counts[primary] = node_counts.get(primary, 0) + 1

    self.log("Distribution across nodes:")
    for node, count in sorted(node_counts.items()):
        self.log(f"  {node}: {count} sectors")

def simulate_data_loss(self, loss_percentage: float = 10) -> Tuple[Dict, List]:
    """Simulate data loss for testing recovery"""
    self.log(f"⚠️  Simulating {loss_percentage}% data loss...")

    all_sectors = self.network.get_all_sectors()
    total_sectors = len(all_sectors)
    num_lost = int(total_sectors * loss_percentage / 100)

    # Randomly select sectors to lose
    lost_keys = random.sample(list(all_sectors.keys()), num_lost)

    # Remove from network
    for sector_id in lost_keys:
        for node_name in self.network.nodes:
            if sector_id in self.network.nodes[node_name]:
                del self.network.nodes[node_name][sector_id]

    remaining = self.network.get_all_sectors()

    self.log(f"   Lost: {len(lost_keys)} sectors")
    self.log(f"   Remaining: {len(remaining)} sectors")

    return remaining, lost_keys

def recover_and_reconstruct(self, available_sectors: Dict = None) -> bytes:
    """Recover from backups and reconstruct file"""
    self.log("🔨 Recovering and reconstructing...")

    if available_sectors is None:
        available_sectors = self.network.get_all_sectors()

    # Check for missing sectors
    total_expected = self.file_metadata.get('num_sectors', 0)
    available_count = len(available_sectors)
    missing_count = total_expected - available_count

    if missing_count > 0:
        self.log(f"   Missing {missing_count} sectors, attempting recovery...")

        # Try to recover from backups
        all_sectors = {}
        for node_name, node_data in self.network.nodes.items():
            for sector_id, sector_info in node_data.items():
                if sector_id not in all_sectors:
                    all_sectors[sector_id] = sector_info

        available_sectors = all_sectors

    # Sort sectors by index
    sorted_sectors = sorted(
        available_sectors.items(),
        key=lambda x: x[1].get('original_index', 0)
    )

    # Reconstruct data
    reconstructed_data = b''

    for sector_id, sector_info in sorted_sectors:
        sector_data = sector_info['data']

        # Remove error correction
        if self.config.get('error_correction', True):
            decoded = self.rs_codec.decode(sector_data)
        else:
            decoded = sector_data

        # Decrypt
        if self.config.get('encryption', True):
            decrypted = self.cipher.decrypt(decoded)
        else:
            decrypted = decoded

        reconstructed_data += decrypted

    # Remove padding
    original_size = self.file_metadata.get('original_size', len(reconstructed_data))
    reconstructed_data = reconstructed_data[:original_size]

    self.log(f"✓ Reconstructed {len(reconstructed_data):,} bytes")

    return reconstructed_data

def verify_integrity(self, data: bytes, original_checksum: str) -> bool:
    """Verify reconstructed data matches original"""
    self.log("✅ Verifying integrity...")

    reconstructed_checksum = self.calculate_checksum(data)

    self.log(f"   Original:      {original_checksum}")
    self.log(f"   Reconstructed: {reconstructed_checksum}")

    if reconstructed_checksum == original_checksum:
        self.log("🎉 SUCCESS! Checksum MATCH!")
        return True
    else:
        self.log("❌ FAILURE! Checksum mismatch!")
        return False

def save_file(self, data: bytes, output_path: str):
    """Save reconstructed file"""
    self.log(f"💾 Saving to: {output_path}")

    with open(output_path, 'wb') as f:
        f.write(data)

    file_size = os.path.getsize(output_path)
    self.log(f"✓ Saved {file_size:,} bytes")

def process_file(self, input_path: str, output_path: str = None,
                 simulate_loss: float = 0) -> bool:
    """Complete pipeline: Read → Process → Scatter → Recover → Verify"""

    start_time = time.time()

    if output_path is None:
        output_path = "reconstructed_" + os.path.basename(input_path)

    print("\n" + "="*70)
    print("🥧 Pi-RAP FILE SYSTEM v2.0 - COMPLETE EDITION")
    print("="*70)
    print(f"Input:  {input_path}")
    print(f"Output: {output_path}")
    print(f"Config: Compression={self.config.get('compression')}, " +
          f"Encryption={self.config.get('encryption')}, " +
          f"Error Correction={self.config.get('error_correction')}")
    print("="*70)

    try:
        # Step 1: Read file
        original_data, checksum = self.read_file(input_path)

        # Step 2: Compress
        if self.config.get('compression', True):
            self.log("Compressing data...")
            compressed_data = self.compress_data(original_data)
            self.stats['compressed_size'] = len(compressed_data)
            self.log(f"✓ Compressed: {len(original_data):,} → {len(compressed_data):,} bytes " +
                    f"({self.stats['compression_ratio']*100:.1f}%)")
        else:
            compressed_data = original_data

        # Step 3: Create sectors
        sectors = self.create_sectors(compressed_data, input_path)

        # Step 4: Encrypt and encode
        encrypted_sectors = self.encrypt_and_encode_sectors(sectors)
        self.stats['encrypted_size'] = sum(s['size'] for s in encrypted_sectors.values())

        # Step 5: Scatter across network
        self.scatter_sectors(encrypted_sectors)

        # Step 6: Simulate data loss (optional)
        if simulate_loss > 0:
            available_sectors, lost = self.simulate_data_loss(simulate_loss)
        else:
            available_sectors = self.network.get_all_sectors()

        # Step 7: Recover and reconstruct
        reconstructed_compressed = self.recover_and_reconstruct(available_sectors)

        # Step 8: Decompress
        if self.config.get('compression', True):
            self.log("Decompressing...")
            reconstructed_data = self.decompress_data(reconstructed_compressed)
        else:
            reconstructed_data = reconstructed_compressed

        # Step 9: Verify
        success = self.verify_integrity(reconstructed_data, checksum)

        # Step 10: Save
        if success:
            self.save_file(reconstructed_data, output_path)

        # Calculate statistics
        self.stats['processing_time'] = time.time() - start_time

        # Print summary
        print("\n" + "="*70)
        print("📊 PROCESSING SUMMARY")
        print("="*70)
        print(f"Original size:     {self.stats['original_size']:,} bytes")
        print(f"Compressed size:   {self.stats['compressed_size']:,} bytes")
        print(f"Total sectors:     {self.stats['total_sectors']:,}")
        print(f"Compression ratio: {self.stats['compression_ratio']*100:.2f}%")
        print(f"Processing time:   {self.stats['processing_time']:.3f} seconds")
        if self.stats['processing_time'] > 0:
            throughput = self.stats['original_size'] / self.stats['processing_time'] / 1024 / 1024
            print(f"Throughput:        {throughput:.2f} MB/s")
        print("="*70)

        if success:
            print("✅ PROCESSING COMPLETE - SUCCESS!")
        else:
            print("❌ PROCESSING COMPLETE - FAILED!")
        print("="*70 + "\n")

        return success

    except Exception as e:
        self.log(f"ERROR: {str(e)}", "ERROR")
        import traceback
        traceback.print_exc()
        return False

def create_test_file(filepath: str, size_kb: int = 100): “”“Create a test file with random data”“” print(f"Creating test file: {filepath} ({size_kb} KB)")

# Create JPEG-like header
data = bytearray()
data.extend(b'\xFF\xD8\xFF\xE0\x00\x10JFIF')  # JPEG header
data.extend(b'\x00' * (size_kb * 1024 - 20))  # Fill with zeros
data.extend(b'\xFF\xD9')  # JPEG footer

with open(filepath, 'wb') as f:
    f.write(data)

print(f"✓ Test file created")

def main(): “”“Main entry point with CLI”“” import argparse

parser = argparse.ArgumentParser(
    description='Pi-RAP: Pi-based Reconstruction & Archival Protocol',
    formatter_class=argparse.RawDescriptionHelpFormatter,
    epilog="""

Examples: python pi_rap_complete.py -i myfile.jpg python pi_rap_complete.py -i myfile.jpg --loss 15 python pi_rap_complete.py -i myfile.jpg --no-compression --no-encryption python pi_rap_complete.py --test “”" )

parser.add_argument('-i', '--input', help='Input file path')
parser.add_argument('-o', '--output', help='Output file path')
parser.add_argument('--loss', type=float, default=0,
                   help='Simulate data loss percentage (0-100)')
parser.add_argument('--sector-size', type=int, default=50000,
                   help='Sector size in bits (default: 50000)')
parser.add_argument('--nodes', type=int, default=5,
                   help='Number of network nodes (default: 5)')
parser.add_argument('--seed', type=int, default=0,
                   help='Encryption seed (default: 0)')
parser.add_argument('--no-compression', action='store_true',
                   help='Disable compression')
parser.add_argument('--no-encryption', action='store_true',
                   help='Disable encryption')
parser.add_argument('--no-error-correction', action='store_true',
                   help='Disable error correction')
parser.add_argument('--test', action='store_true',
                   help='Run with test file')
parser.add_argument('--quiet', action='store_true',
                   help='Suppress verbose output')

args = parser.parse_args()

# Create configuration
config = {
    'sector_size': args.sector_size,
    'compression': not args.no_compression,
    'encryption': not args.no_encryption,
    'error_correction': not args.no_error_correction,
    'network_nodes': args.nodes,
    'encryption_seed': args.seed,
    'verbose': not args.quiet
}

# Initialize system
pirap = PiRAPFileSystem(config)

# Test mode
if args.test:
    test_file = "test_image.jpg"
    create_test_file(test_file, size_kb=500)
    args.input = test_file
    if not args.output:
        args.output = "recovered_test.jpg"

# Process file
if args.input:
    success = pirap.process_file(
        input_path=args.input,
        output_path=args.output,
        simulate_loss=args.loss
    )
    sys.exit(0 if success else 1)
else:
    parser.print_help()
    sys.exit(1)

if name == “main ”: main()

Discussion in the ATmosphere

Loading comments...