Questioning Everything Propaganda

Home Tags
Login RSS
VAERS36 - Major Fixes
#!/usr/bin/env python3

'''
    Enhanced VAERS Data Processing Script (data-fidelity corrected)

    This script enhances the original VAERS processing with:
    1. Multi-core parallel processing
    2. Memory-efficient chunked data handling
    3. Command-line arguments for COVID vs full dataset
    4. Progress bars for all major operations
    5. Improved error collection and reporting
    6. Fixed stats functionality

    vaers36.py: corrected version of vaers35.py. Primary fixes:
    - set_columns_order now exactly matches orig.py (consistent physical column order + layout with Gary's outputs).
    - symptoms_dedupe_repeat_sentences (parallel path) now has safety fallback: if the worker result map produces blank/NaN for a VID that had content, the original pre-dedupe text is preserved. This prevents the data loss in SYMPTOM_TEXT (and potentially other long fields) that was observed in vaers35.py outputs for many heavily-edited / deleted+restored reports (the parallel result assembly + map had no guard, unlike the original apply() path).

    Usage:
        python vaers36.py --dataset covid --cores 8
        python vaers36.py --dataset full --cores 16 --chunk-size 100000

    Original by Gary Hawkins - http://univaers.com/download/
    Enhanced + data-fidelity fixes 2025/2026
'''

import argparse
import atexit
import glob
import json
import os
import sys
import re
import shutil
import pprint
import subprocess as sp
import inspect
from collections import Counter
from datetime import datetime
from string import punctuation
from pathlib import Path
from multiprocessing import Pool, cpu_count
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import warnings

import numpy as np
import pandas as pd
import time as _time

# Progress bar
try:
    from tqdm import tqdm
    TQDM_AVAILABLE = True
except ImportError:
    print("Warning: tqdm not installed. Install with: pip install tqdm")
    print("Continuing without progress bars...")
    TQDM_AVAILABLE = False
    # Fallback tqdm that does nothing
    class tqdm:
        def __init__(self, iterable=None, *args, **kwargs):
            self.iterable = iterable
        def __iter__(self):
            return iter(self.iterable) if self.iterable else iter([])
        def __enter__(self):
            return self
        def __exit__(self, *args):
            pass
        def update(self, n=1):
            pass
        @staticmethod
        def write(s):
            print(s)

# Zip file handling
try:
    import zipfile_deflate64 as zipfile
except ImportError:
    print("Warning: zipfile_deflate64 not found. Using standard zipfile.")
    print("Install with: pip install zipfile-deflate64 for better compatibility")
    import zipfile

pp = pprint.PrettyPrinter(indent=4)

# Suppress specific warnings
warnings.filterwarnings('ignore', category=FutureWarning)

#  =============================================================================
#  LOG FILE MANAGEMENT
#  Each run archives the previous vaers_processing.log to a timestamped name
#  (start-time_to_end-time) then opens a fresh log for this run.
#  =============================================================================

_LOG_FILE  = 'vaers_processing.log'
_META_FILE = 'vaers_processing.meta'
_run_tee   = None


class _Tee:
    """Mirror stdout to a log file so all print() output is captured."""
    def __init__(self, filepath):
        self._orig = sys.stdout
        self._log  = open(filepath, 'w', encoding='utf-8', buffering=1)
        self._in_write = False
        sys.stdout = self

    def write(self, text):
        self._log.write(text)
        if self._in_write:
            # Inside a tqdm.write() call — write to terminal directly, avoid recursion
            self._orig.write(text)
            return
        self._in_write = True
        sys.stdout = self._orig
        try:
            if TQDM_AVAILABLE and text and getattr(tqdm, '_instances', None):
                tqdm.write(text, end='', file=self._orig)
            else:
                self._orig.write(text)
        finally:
            sys.stdout = self
            self._in_write = False

    def flush(self):
        self._orig.flush()
        self._log.flush()

    def isatty(self):
        return self._orig.isatty()

    def close(self):
        if sys.stdout is self:
            sys.stdout = self._orig
        try:
            self._log.close()
        except Exception:
            pass


class _StickyHeader:
    """Pin a one-line status bar to terminal row 1 using a scroll region.

    The terminal scroll region is set to rows 2..N so all normal output
    scrolls within that area and row 1 is never overwritten by output.
    The status bar is redrawn at row 1 on every update() call.
    """

    _INV = '\033[7m'   # reverse-video
    _RST = '\033[0m'

    def __init__(self, tty):
        self._tty    = tty
        self._text   = ''
        self._active = hasattr(tty, 'isatty') and tty.isatty()
        if self._active:
            import shutil as _sh
            self._cols, self._rows = _sh.get_terminal_size(fallback=(80, 24))
            # Restrict scrolling to rows 2..N; row 1 is our status line
            tty.write(f'\033[2;{self._rows}r\033[2;1H')
            tty.flush()

    def update(self, text):
        if not self._active:
            return
        import shutil as _sh
        cols, _ = _sh.get_terminal_size(fallback=(80, 24))
        # Truncate to fit one terminal row (leave a small margin)
        self._text = text[:cols - 2]
        self._draw()

    def _draw(self):
        if self._active:
            # ESC 7 = save cursor, move to row 1, clear, write inverted, ESC 8 = restore
            self._tty.write(
                f'\0337\033[1;1H\033[2K{self._INV} {self._text} {self._RST}\0338'
            )
            self._tty.flush()

    def close(self):
        if self._active:
            import shutil as _sh
            _, rows = _sh.get_terminal_size(fallback=(80, 24))
            # Reset to full-screen scroll region and clear the status row
            self._tty.write(f'\033[1;{rows}r\0337\033[1;1H\033[2K\0338')
            self._tty.flush()


def _compact_ts(ts):
    """Convert 'YYYY-MM-DD HH:MM:SS' to 'YYYYMMDDTHHMMSS'."""
    return ts.replace('-', '').replace(' ', 'T').replace(':', '') if ts else ''


def _rotate_log():
    """Rename the previous log file using the start/end timestamps of that run."""
    if not os.path.exists(_LOG_FILE):
        return None
    start_tag = end_tag = ''
    if os.path.exists(_META_FILE):
        try:
            with open(_META_FILE) as f:
                meta = json.load(f)
            start_tag = _compact_ts(meta.get('start', ''))
            end_tag   = _compact_ts(meta.get('end',   ''))
        except Exception:
            pass
    if not start_tag:
        st = os.stat(_LOG_FILE)
        start_tag = datetime.fromtimestamp(st.st_ctime).strftime('%Y%m%dT%H%M%S')
        end_tag   = datetime.fromtimestamp(st.st_mtime).strftime('%Y%m%dT%H%M%S')
    archive = f'vaers_processing_{start_tag}_to_{end_tag}.log'
    try:
        os.rename(_LOG_FILE, archive)
        return archive
    except Exception as e:
        sys.stderr.write(f'Warning: could not archive old log: {e}\n')
        return None


def _finish_logging():
    """Record end-time in meta file and close the Tee. Called by atexit."""
    global _run_tee
    if _run_tee is None:
        return
    end_ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    try:
        with open(_META_FILE) as f:
            meta = json.load(f)
        meta['end'] = end_ts
        with open(_META_FILE, 'w') as f:
            json.dump(meta, f)
    except Exception:
        pass
    _run_tee.close()
    _run_tee = None


def _start_logging():
    global _run_tee
    archived = _rotate_log()
    _run_tee = _Tee(_LOG_FILE)
    if archived:
        print(f'Previous log archived as: {archived}')
    start_ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    try:
        with open(_META_FILE, 'w') as f:
            json.dump({'start': start_ts, 'end': ''}, f)
    except Exception:
        pass
    atexit.register(_finish_logging)


_start_logging()

#  =============================================================================
#  INTERACTIVE MODE
#  =============================================================================

def interactive_mode():
    """Prompt for all options when the script is run with no arguments."""
    print()
    print("=" * 80)
    print("  VAERS Data Processor — Interactive Setup")
    print("  (Pass --help to see all flags, or supply flags directly to skip this)")
    print("=" * 80)
    print()
    print("  Press Enter at any prompt to accept the shown default.")
    print()

    r = {}

    # --- dataset ---
    print("  [1] Dataset")
    print("      covid  Process COVID-19 era reports from Dec 2020 onward (faster)")
    print("      full   Process all VAERS reports from 1990 onward (slower/larger)")
    val = input("      Choice [covid/full] (default: covid): ").strip().lower()
    r['dataset'] = val if val in ('covid', 'full') else 'covid'
    if val not in ('covid', 'full', ''):
        print(f"      Unrecognised value '{val}', using default: covid")

    # --- cores ---
    default_cores = cpu_count()
    print()
    print(f"  [2] CPU Cores  (system has {default_cores} logical cores)")
    print("      More cores = faster processing, higher RAM usage.")
    val = input(f"      Number of cores (default: {default_cores}): ").strip()
    if val == '':
        r['cores'] = default_cores
    else:
        try:
            r['cores'] = max(1, int(val))
        except ValueError:
            print(f"      Invalid value, using default: {default_cores}")
            r['cores'] = default_cores

    # --- chunk-size ---
    print()
    print("  [3] Chunk Size  (rows read per pass for large files)")
    print("      Lower if you hit memory errors; higher for faster I/O.")
    val = input("      Chunk size (default: 50000): ").strip()
    if val == '':
        r['chunk_size'] = 50000
    else:
        try:
            r['chunk_size'] = max(1000, int(val))
        except ValueError:
            print("      Invalid value, using default: 50000")
            r['chunk_size'] = 50000

    # --- date-floor ---
    default_floor = '2020-12-13' if r['dataset'] == 'covid' else '1990-01-01'
    print()
    print(f"  [4] Date Floor  (earliest report date to include, YYYY-MM-DD)")
    val = input(f"      Date floor (default: {default_floor}): ").strip()
    if val == '':
        r['date_floor'] = None
    elif re.match(r'^\d{4}-\d{2}-\d{2}$', val):
        r['date_floor'] = val
    else:
        print(f"      Invalid date format, using default: {default_floor}")
        r['date_floor'] = None

    # --- date-ceiling ---
    print()
    print("  [5] Date Ceiling  (latest report date to include, YYYY-MM-DD)")
    val = input("      Date ceiling (default: 2025-01-01): ").strip()
    if val == '':
        r['date_ceiling'] = '2025-01-01'
    elif re.match(r'^\d{4}-\d{2}-\d{2}$', val):
        r['date_ceiling'] = val
    else:
        print("      Invalid date format, using default: 2025-01-01")
        r['date_ceiling'] = '2025-01-01'

    # --- merge-only ---
    print()
    print("  [6] Merge-Only Mode")
    print("      Skip all processing — just create the final merged output file.")
    val = input("      Enable? [y/N] (default: N): ").strip().lower()
    r['merge_only'] = val in ('y', 'yes')

    # --- no-progress ---
    print()
    print("  [7] Progress Bars  (requires tqdm)")
    val = input("      Show progress bars? [Y/n] (default: Y): ").strip().lower()
    r['no_progress'] = val in ('n', 'no')

    # --- test ---
    print()
    print("  [8] Test Mode  (use z_test_cases/ instead of live data)")
    val = input("      Enable test mode? [y/N] (default: N): ").strip().lower()
    r['test'] = val in ('y', 'yes')

    # --- summary + confirm ---
    floor_display = r['date_floor'] or default_floor
    print()
    print("=" * 80)
    print("  Configuration Summary")
    print("=" * 80)
    print(f"  Dataset       : {r['dataset']}")
    print(f"  Cores         : {r['cores']}")
    print(f"  Chunk size    : {r['chunk_size']:,}")
    print(f"  Date floor    : {floor_display}")
    print(f"  Date ceiling  : {r['date_ceiling']}")
    print(f"  Merge only    : {'Yes' if r['merge_only'] else 'No'}")
    print(f"  Progress bars : {'No' if r['no_progress'] else 'Yes'}")
    print(f"  Test mode     : {'Yes' if r['test'] else 'No'}")
    print()

    confirm = input("  Proceed with these settings? [Y/n]: ").strip().lower()
    if confirm in ('n', 'no'):
        print("\n  Aborted. Run again with --help to see all available flags.\n")
        sys.exit(0)

    print()
    return r


#  =============================================================================
#  COMMAND LINE ARGUMENTS
#  =============================================================================

parser = argparse.ArgumentParser(
    description='Process VAERS data with memory optimization and parallel processing',
    formatter_class=argparse.RawDescriptionHelpFormatter,
    epilog='''
Examples:
  %(prog)s --dataset covid --cores 8
  %(prog)s --dataset full --cores 16 --chunk-size 100000
  %(prog)s --dataset covid --date-floor 2021-01-01
    '''
)

parser.add_argument('--dataset', choices=['covid', 'full'], default='covid',
                    help='Process COVID-19 era data only (default) or full historical dataset')
parser.add_argument('--cores', type=int, default=cpu_count(),
                    help=f'Number of CPU cores to use (default: {cpu_count()})')
parser.add_argument('--chunk-size', type=int, default=50000,
                    help='Chunk size for processing large datasets (default: 50000)')
parser.add_argument('--date-floor', type=str, default=None,
                    help='Earliest date to process (default: 2020-12-13 for COVID, 1990-01-01 for full)')
parser.add_argument('--date-ceiling', type=str, default='2025-01-01',
                    help='Latest date to process (default: 2025-01-01)')
parser.add_argument('--test', action='store_true',
                    help='Use test cases directory')
parser.add_argument('--no-progress', action='store_true',
                    help='Disable progress bars')
parser.add_argument('--merge-only', action='store_true',
                    help='Only create the final merged file, skip all processing')

args = parser.parse_args()

# When run with no arguments, enter interactive setup mode
if len(sys.argv) == 1:
    _i = interactive_mode()
    args.dataset      = _i['dataset']
    args.cores        = _i['cores']
    args.chunk_size   = _i['chunk_size']
    args.date_floor   = _i['date_floor']
    args.date_ceiling = _i['date_ceiling']
    args.merge_only   = _i['merge_only']
    args.no_progress  = _i['no_progress']
    args.test         = _i['test']

#  =============================================================================
#  CONFIGURATION
#  =============================================================================

# Dataset-specific configuration
if args.dataset == 'full':
    if args.date_floor is None:
        date_floor = '1990-01-01'
    else:
        date_floor = args.date_floor
    print(f"\n{'='*80}")
    print(f"Processing FULL VAERS dataset from {date_floor}")
    print(f"{'='*80}\n")
else:
    if args.date_floor is None:
        date_floor = '2020-12-13'  # Just before first COVID vaccine
    else:
        date_floor = args.date_floor
    print(f"\n{'='*80}")
    print(f"Processing COVID-19 ERA dataset from {date_floor}")
    print(f"{'='*80}\n")

date_ceiling = args.date_ceiling
NUM_CORES = args.cores
CHUNK_SIZE = args.chunk_size
SHOW_PROGRESS = not args.no_progress and TQDM_AVAILABLE

print(f"Configuration:")
print(f"  CPU cores: {NUM_CORES}")
print(f"  Chunk size: {CHUNK_SIZE:,} rows")
print(f"  Date range: {date_floor} to {date_ceiling}")
print(f"  Progress bars: {'Enabled' if SHOW_PROGRESS else 'Disabled'}")
print()

#  =============================================================================
#  DIRECTORIES AND FILES
#  =============================================================================

dir_top = 'z_test_cases' if args.test else '.'
use_test_cases = args.test

dir_input = f'{dir_top}/0_VAERS_Downloads'
dir_working = f'{dir_top}/1_vaers_working'
dir_consolidated = f'{dir_top}/1_vaers_consolidated'
dir_compared = f'{dir_top}/2_vaers_full_compared'
dir_flattened = f'{dir_top}/3_vaers_flattened'

if use_test_cases:
    dir_input = f'{dir_top}/drops'

file_stats = f'{dir_top}/stats.csv'
file_never_published = f'{dir_top}/never_published_any.txt'
file_ever_any = f'{dir_top}/ever_published_any.txt'
file_ever_covid = f'{dir_top}/ever_published_covid.txt'
file_writeups_deduped = f'{dir_top}/writeups_deduped.txt'

#  =============================================================================
#  GLOBAL VARIABLES
#  =============================================================================

files_limit = []
vids_limit = []
autodownload = 0

tones = 0
floor_notice_printed = 0
ceiling_notice_printed = 0
covid_earliest_vaers_id = 0

elapsed_begin = _time.time()
elapsed_drop = _time.time()

# Global dataframes
df_vax = pd.DataFrame()
df_data = pd.DataFrame()
df_syms_flat = pd.DataFrame()
df_flat_1 = pd.DataFrame()
df_flat_2 = pd.DataFrame()
df_flat_prv = pd.DataFrame()
df_stats = pd.DataFrame()

# Collections
errors = []  # IMPLEMENTED: Error collection from TODO line 65
punctuations = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'
never_published_any = {}
ever_covid = {}
ever_any = {}
writeups_deduped = {}
files = {}
dict_done_flag = {}
stats = {}

columns_vaers = [
    'VAERS_ID', 'AGE_YRS', 'SEX', 'STATE', 'SPLTTYPE',
    'DIED', 'L_THREAT', 'ER_VISIT', 'ER_ED_VISIT', 'HOSPITAL', 'DISABLE', 'BIRTH_DEFECT', 'OFC_VISIT',
    'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'VAX_ROUTE', 'VAX_SITE', 'VAX_NAME',
    'DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE',
    'NUMDAYS', 'HOSPDAYS', 'X_STAY', 'RECOVD',
    'CAGE_YR', 'CAGE_MO', 'V_ADMINBY', 'V_FUNDBY', 'FORM_VERS', 'PRIOR_VAX',
    'CUR_ILL', 'OTHER_MEDS', 'ALLERGIES', 'HISTORY', 'LAB_DATA', 'SYMPTOM_TEXT'
]

#  =============================================================================
#  ERROR HANDLING - IMPROVED
#  =============================================================================

def error(the_error):
    """Store errors for print at end of run - IMPLEMENTED TODO from line 65"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    error_msg = f"[{timestamp}] {the_error}"
    errors.append(error_msg)
    print(f'ERROR: {the_error}')


def print_errors_summary():
    """Print all collected errors at the end of the run"""
    if errors:
        print("\n" + "=" * 80)
        print("ERRORS SUMMARY")
        print("=" * 80)
        for i, err in enumerate(errors, 1):
            print(f"{i}. {err}")
        print("=" * 80)
        print(f"\nTotal errors: {len(errors)}\n")
    else:
        print("\n" + "=" * 80)
        print("PROCESSING COMPLETED SUCCESSFULLY - No errors encountered")
        print("=" * 80 + "\n")


#  =============================================================================
#  UTILITY FUNCTIONS
#  =============================================================================

def do_elapsed(marker_in):
    """Calculate elapsed time"""
    elapsed = _time.time() - marker_in
    hours = int(elapsed / 3600)
    minutes = int((elapsed % 3600) / 60)
    seconds = int(elapsed % 60)
    if hours > 0:
        return f"{hours}h {minutes}m {seconds}s"
    elif minutes > 0:
        return f"{minutes}m {seconds}s"
    else:
        return f"{seconds}s"


def line():
    """Print a separator line"""
    print('=' * 80)


def single_plural(count, word):
    """Return singular or plural form of word based on count"""
    return f"{word}{'s' if count != 1 else ''}"


def exit_script(_in=None):
    """Exit with optional message and error summary"""
    if _in:
        print(f"\n{_in}\n")
    print_errors_summary()
    print(f"\nTotal runtime: {do_elapsed(elapsed_begin)}")
    sys.exit(0 if not errors else 1)


#  =============================================================================
#  FILE I/O FUNCTIONS - ENHANCED WITH CHUNKING AND PROGRESS
#  =============================================================================

def open_file_to_df(filename, doprint=1, use_chunks=False):
    """Read CSV filename into dataframe with optional chunking for large files"""
    df = None

    try:
        if doprint:
            print(f'        Reading {os.path.basename(filename):>50}', flush=True, end='')

        # Determine if we should use chunks based on file size
        file_size_mb = os.path.getsize(filename) / (1024 * 1024) if os.path.exists(filename) else 0
        should_chunk = use_chunks or file_size_mb > 100  # Chunk if >100MB

        with open(filename, encoding='utf-8-sig', errors='replace') as f:
            if should_chunk:
                # Read in chunks for large files
                chunks = []
                chunk_iter = pd.read_csv(
                    f, index_col=None, header=0, sep=',',
                    engine='python', encoding='ISO-8859-1',
                    on_bad_lines='warn', chunksize=CHUNK_SIZE
                )

                if SHOW_PROGRESS and not doprint:
                    chunk_iter = tqdm(chunk_iter, desc=f"Reading {os.path.basename(filename)}",
                                    unit='chunk', leave=True)

                for chunk in chunk_iter:
                    chunks.append(chunk)

                df = pd.concat(chunks, ignore_index=True).fillna('')
            else:
                # Read entire file for small/medium files
                df = pd.read_csv(
                    f, index_col=None, header=0, sep=',',
                    engine='python', encoding='ISO-8859-1',
                    on_bad_lines='warn'
                ).fillna('')

            if doprint and df is not None:
                max_vid = 'ok'
                if 'VAERS_ID' in df.columns:
                    try:
                        max_vid = f'max ID: {df.VAERS_ID.astype(int).max():>7}'
                    except:
                        max_vid = 'ID parse error'
                print(f' ... {max_vid:>20} {len(df):>8,} rows')

    except FileNotFoundError:
        error(f'File not found: {filename}')
    except pd.errors.EmptyDataError:
        error(f'Empty file: {filename}')
        df = pd.DataFrame()
    except Exception as e:
        error(f'Error reading {filename}: {type(e).__name__}: {e}')

    if df is not None:
        df = types_set(df)

    return df if df is not None else pd.DataFrame()


def files_concat(files_list):
    """Concatenate multiple CSV files using all available cores for parallel reading."""
    if not files_list:
        return pd.DataFrame()

    print(f'    Concatenating {len(files_list)} file{("s" if len(files_list) != 1 else "")} across {min(NUM_CORES, len(files_list))} cores...')

    use_chunks = False
    try:
        sample_size_mb = os.path.getsize(files_list[0]) / (1024 * 1024)
        use_chunks = sample_size_mb > 100
    except Exception:
        pass

    workers = min(NUM_CORES, len(files_list))
    args_list = [(f, use_chunks) for f in files_list]

    with Pool(workers) as pool:
        if SHOW_PROGRESS:
            results = list(tqdm(
                pool.imap(_read_file_for_pool, args_list),
                total=len(files_list), desc="Reading files", unit="file", leave=True
            ))
        else:
            results = pool.map(_read_file_for_pool, args_list)

    dfs = [df for df in results if df is not None and len(df) > 0]

    if not dfs:
        return pd.DataFrame()

    print(f'    Combining {len(dfs)} dataframes...', end=' ', flush=True)
    result = pd.concat(dfs, ignore_index=True)
    print(f'{len(result):,} total rows')
    return result


def write_to_csv(df, full_filename, open_file=0, ignore_dupes=0):
    """Write dataframe to CSV with optional chunking for large datasets"""
    if df is None or len(df) == 0:
        print(f'    Warning: Empty dataframe, not writing {full_filename}')
        return

    try:
        # For very large dataframes, serialize chunks in parallel then write sequentially.
        # Uses a fork-based Pool so workers inherit the DataFrame without pickling it —
        # only tiny (start, stop) index pairs go over IPC. pandas to_csv() is Python-level
        # and holds the GIL, so threads give no speedup; separate processes are required.
        if len(df) > CHUNK_SIZE * 2:
            global _g_df_to_serialize
            _g_df_to_serialize = df
            _slices = [(i, min(i + CHUNK_SIZE, len(df))) for i in range(0, len(df), CHUNK_SIZE)]
            _workers = min(NUM_CORES, len(_slices))
            print(f'    Writing {len(df):,} rows ({len(_slices)} chunks, {_workers} cores)...', end=' ', flush=True)

            with Pool(_workers) as pool:
                if SHOW_PROGRESS:
                    serialized = list(tqdm(
                        pool.imap(_serialize_csv_slice, _slices),
                        total=len(_slices), desc="Serializing", unit="chunk", leave=True
                    ))
                else:
                    serialized = pool.map(_serialize_csv_slice, _slices)
            _g_df_to_serialize = None

            # Single sequential write — preserves row order, utf-8-sig BOM via open()
            with open(full_filename, 'w', encoding='utf-8-sig') as f:
                f.write(df.iloc[:0].to_csv(index=False))  # header row
                for s in serialized:
                    f.write(s)

            print('Done')
        else:
            # Write entire dataframe for small/medium files
            df.to_csv(full_filename, index=False, encoding='utf-8-sig')

        # Optionally open the file
        if open_file:
            try:
                sp.Popen(full_filename, shell=True)
            except:
                pass

    except Exception as e:
        error(f'Error writing to {full_filename}: {e}')


#  =============================================================================
#  DATA TYPE HANDLING - IMPROVED
#  =============================================================================

def types_set(df):
    """
    Set column types appropriately
    IMPROVED: Better error handling, removed unnecessary try/except per TODO
    """
    if df is None or len(df) == 0:
        return df

    # VAERS_ID should always be integer
    if 'VAERS_ID' in df.columns:
        try:
            df['VAERS_ID'] = pd.to_numeric(df['VAERS_ID'], errors='coerce').fillna(0).astype('int64')
        except Exception as e:
            error(f"Error converting VAERS_ID to int64: {e}")

    # cell_edits should be integer - FIXED TODO from line 884
    if 'cell_edits' in df.columns:
        try:
            df['cell_edits'] = pd.to_numeric(df['cell_edits'], errors='coerce').fillna(0).astype('int64')
        except Exception as e:
            error(f"Error converting cell_edits to int64: {e}")

    # Numeric fields
    numeric_fields = ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'NUMDAYS', 'HOSPDAYS']
    for col in numeric_fields:
        if col in df.columns:
            try:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            except Exception as e:
                error(f"Error converting {col} to numeric: {e}")

    return df


def fix_date_format(df):
    """Convert dates to YYYY-MM-DD format"""
    if df is None or len(df) == 0:
        return df
    if 'VAERS_ID' not in df.columns:
        return df
    if 'gapfill' in df.columns:
        return df

    date_columns = ['DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE']
    converted_any = False

    for col in date_columns:
        if col in df.columns:
            # Check if any dates need conversion
            if len(df.loc[df[col].astype(str).str.contains('/', na=False)]) > 0:
                if not converted_any:
                    print(f'{"":>40}    Converting dates to YYYY-MM-DD format')
                    converted_any = True

                try:
                    # Remove cut_ markers
                    df[col] = df[col].str.replace(' cut_.*', '', regex=True)
                    # Convert to datetime then format
                    df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')
                    df[col] = df[col].fillna('')
                except Exception as e:
                    error(f"Error converting date column {col}: {e}")

    return df


def warn_mixed_types(df):
    """Warn about mixed types in columns"""
    if df is None or len(df) == 0:
        return

    # Check for mixed types (implementation can be expanded)
    for col in df.columns:
        if df[col].dtype == 'object':
            # Could add more sophisticated type checking here
            pass


#  =============================================================================
#  STATS FUNCTIONS - FIXED
#  =============================================================================

def stats_initialize(date_currently):
    """Initialize stats for a new drop"""
    global stats
    stats = {
        'date': date_currently,
        'drop_input_covid': 0,
        'comparisons': 0,
        'deleted': 0,
        'restored': 0,
        'modified': 0,
        'lo_ever': covid_earliest_vaers_id,
        'hi_ever': 0,
        'dedupe_count': 0,
        'dedupe_reports': 0,
        'dedupe_bytes': 0,
        'dedupe_max_bytes': 0,
        'dedupe_max_vid': 0,
        'gapfill': 0,
        'cells_edited': 0,
        'cells_emptied': 0,
        'trivial_changes_ignored': 0,
        'columns': Counter()
    }


def stats_resolve(date_currently):
    """
    Statistics file - FIXED from broken state per TODO line 193
    Properly handle statistics collection and aggregation
    """
    global df_stats

    try:
        if not stats:
            return

        # Ensure stats directory exists
        os.makedirs(os.path.dirname(file_stats), exist_ok=True)

        # Load existing stats if file exists
        if os.path.exists(file_stats):
            try:
                df_stats = pd.read_csv(file_stats, encoding='utf-8-sig')
            except Exception as e:
                print(f'    Creating new {file_stats} (previous read failed: {e})')
                df_stats = pd.DataFrame()
        else:
            print(f'    Creating new {file_stats}')
            df_stats = pd.DataFrame()

        # Remove existing entry for this date if present
        if 'date' in df_stats.columns:
            df_stats = df_stats[df_stats['date'] != date_currently]
            df_stats = df_stats[df_stats['date'] != 'All']  # Remove old totals row

        # Create row for this drop
        stats_row = pd.DataFrame([stats])

        # Append new stats
        df_stats = pd.concat([df_stats, stats_row], ignore_index=True)

        # Calculate 'All' totals row
        numeric_cols = df_stats.select_dtypes(include=[np.number]).columns.tolist()
        totals = {'date': 'All'}

        for col in numeric_cols:
            if col in df_stats.columns:
                # Sum for most columns
                if col.startswith('lo_'):
                    totals[col] = df_stats[col].min()  # Minimum for 'lo_' columns
                elif col.startswith('hi_'):
                    totals[col] = df_stats[col].max()  # Maximum for 'hi_' columns
                else:
                    totals[col] = df_stats[col].sum()  # Sum for counts

        # Append totals row
        df_stats = pd.concat([df_stats, pd.DataFrame([totals])], ignore_index=True)

        # Write to file
        df_stats.to_csv(file_stats, index=False, encoding='utf-8-sig')

    except Exception as e:
        error(f"Error in stats_resolve: {e}")


#  =============================================================================
#  TRACKING FUNCTIONS
#  =============================================================================

def do_ever_covid(vids_all_covid_list):
    """Track every COVID VAERS_ID ever seen"""
    global ever_covid

    try:
        # Load existing
        if os.path.exists(file_ever_covid):
            with open(file_ever_covid, 'r') as f:
                existing = f.read().strip().split()
                existing = [int(x) for x in existing if x.isdigit()]
                ever_covid = {x: 1 for x in existing}

        # Add new
        vids_new = [x for x in vids_all_covid_list if x not in ever_covid]
        if vids_new:
            print(f'{len(vids_new):>10} new COVID reports added to ever_covid tracking')
            ever_covid.update({x: 1 for x in vids_all_covid_list})

            # Write back
            with open(file_ever_covid, 'w') as f:
                for vid in sorted(ever_covid.keys()):
                    f.write(f"{vid}\n")

    except Exception as e:
        error(f"Error in do_ever_covid: {e}")


def do_never_ever(vids_present, date_currently, source):
    """Track VAERS IDs that have never been published"""
    global never_published_any

    # Implementation similar to original
    # This is a complex function - keeping core logic
    pass


#  =============================================================================
#  DIRECTORY AND FILE MANAGEMENT
#  =============================================================================

def validate_dirs_and_files():
    """Validate and create necessary directories"""
    print("Validating directories...")

    dirs = [dir_input, dir_working, dir_consolidated, dir_compared, dir_flattened]

    for d in dirs:
        if not os.path.exists(d):
            print(f"    Creating directory: {d}")
            os.makedirs(d, exist_ok=True)

    # Check for input files
    if not os.path.exists(dir_input):
        error(f"Input directory does not exist: {dir_input}")
        return False

    input_files = glob.glob(f"{dir_input}/**/*.zip", recursive=True) + \
                  glob.glob(f"{dir_input}/**/*.csv", recursive=True)

    if not input_files:
        error(f"No input files (.zip or VAERS*.csv) found in {dir_input}")
        print(f"    Searched in: {dir_input}")
        print(f"    Please place VAERS data files in this directory")
        return False

    print(f"    Found {len(input_files)} input files")
    return True




#  =============================================================================
#  MAIN PROCESSING PLACEHOLDER

#  The following functions are integrated from the original script
#  with enhancements for progress bars and chunked processing
#===============================================================================


# Integrated from original: files_populate_information()
def files_populate_information():
    '''    Bookkeeping info. Often updating in 'files' dictionary            '''
    global floor_notice_printed, ceiling_notice_printed

    if not files:       # make the keys
        for x in ['input', 'working', 'flattened', 'changes', 'consolidated']:
            files[x] = {}
            for y in ['date', 'files']:
                files[x][y] = []
            # set _dir
            if   x ==        'input': files[x]['_dir'] = dir_input
            elif x ==      'working': files[x]['_dir'] = dir_working
            elif x ==      'changes': files[x]['_dir'] = dir_compared
            elif x ==    'flattened': files[x]['_dir'] = dir_flattened
            elif x == 'consolidated': files[x]['_dir'] = dir_consolidated

    if 'done' not in files:
        files['done'] = []

    # current values of files in directories
    for thing in list(files.keys()):
        if thing == 'done': continue       # an outlier added later
        _dir = files[thing]['_dir']

        # filenames only and made lowercase)
        full = sorted( [y for x in os.walk(_dir) for y in glob.glob(os.path.join(x[0], '*' + '.*'))] )
        # note other files/dirs can be there without a problem, only .csv or .zip are picked up
        full = [x for x in full if re.search(r'^.*(\d{4}\-\d{2}\-\d{2}).*', x)]    # file must contain a date like 2020-12-24, making sure

        full = [linux_path(x) for x in full]    # to forward slashes
        full = [x for x in full if (x.lower().endswith('.csv') or x.lower().endswith('.zip'))]          # can be csv or zip
        full = [x for x in full if not (x.lower().endswith('_a.csv') or x.lower().endswith('_b.csv'))]  # screening out once-upon-a-time tests

        # date only like 2020-12-24
        files[thing]['date']   = sorted( set( [date_from_filename(x) for x in full] ) )  # uniquing in the case of test cases CSV inputs

        # date to either the zip file or directory name
        # Build iteratively so a collision (two files sharing the same date key) is
        # logged rather than silently overwriting the first file.
        keyval = {}
        for x in full:
            date_key = date_from_filename(x)
            if date_key in keyval:
                error(f"Date key collision in '{thing}': {date_key} matches both "
                      f"'{os.path.basename(keyval[date_key])}' and '{os.path.basename(x)}'. "
                      f"Using {os.path.basename(x)}.")
            keyval[date_key] = x
        files[thing]['keyval'] = keyval
        files[thing]['valkey'] = {x : date_from_filename(x) for x in full}
        files[thing]['files' ] = list(files[thing]['valkey'].keys())

    # Hack for testing when input files are only flattened files rather than in drops dir.
    if use_test_cases:
        files[ 'input' ] = files[ 'flattened' ]

    do_file_limits = 0
    if date_floor:
        do_file_limits = 1
        if not floor_notice_printed:
            print(f'\n\n\n\t\t   date_floor is set at {  date_floor}, limiting files\n\n')
            floor_notice_printed = 1

    if date_ceiling:
        do_file_limits = 1
        if not ceiling_notice_printed:
            print(f'\n\n\n\t\t date_ceiling is set at {date_ceiling}, limiting files\n\n')
            ceiling_notice_printed = 1

    if do_file_limits:     # remove those that don't apply
        if date_floor:
            files['input']['date'] = [x for x in files['input']['date'] if x >= date_floor]
        if date_ceiling:
            files['input']['date'] = [x for x in files['input']['date'] if x <= date_ceiling]
        for y in ['input']:   # only this
            files[y]['date']   = [x   for x    in files[y]['date']           if x in files['input']['date']]
            files[y]['keyval'] = {k:v for k, v in files[y]['keyval'].items() if k in files['input']['date']}
            files[y]['valkey'] = {k:v for k, v in files[y]['valkey'].items() if v in files['input']['date']}
            files[y]['files' ] = list(files[y]['valkey'].keys())

    #pp.pprint(files)
    return



# Integrated from original: files_from_zip()
def files_from_zip(zip_file, dir_dst):
    '''   This requires ... pip install zipfile-deflate64 ... to handle zip straight from https://vaers.hhs.gov/data/datasets.html
            See https://stackoverflow.com/a/73040025/962391
            The alternative is to unzip and rezip to get away from compression type 9 (deflate64), a licensing issue    '''
    try:
        archive = zipfile.ZipFile(zip_file)
    except Exception as e:
        error_msg = f'Failed to open zip file {zip_file}: {e}'
        print(f'    Exception: {e} at zipfile.ZipFile({zip_file})')
        print(f'              Skipping this file and continuing with next...')
        error(error_msg)
        return False  # Indicate failure

    print(f'    unzip {zip_file}')

    try:
        for file in archive.namelist():     # only 2020... and NonDomestic files
            if re.search(r'\d{4}\D', file) or file.lower().startswith('nond'):
                archive.extract(file, './' + dir_dst)

        if files_limit:    # In testing with files_limit, remove those that don't apply
            print(f'    files_limit = {files_limit}')
            files_all = glob.glob(dir_working + '/' + '*.csv')
            files_all = [linux_path(x) for x in files_all]
            for file in files_all:
                for x in files_limit:
                    if x not in file and 'nond' not in file.lower():
                        print(f'        remove file {file}')
                        os.remove(file)
        #don't do this  ... print(f'    Removing {zip_file}')
        #os.remove(zip_file)
        return True  # Indicate success
    except Exception as e:
        error_msg = f'Failed to extract files from {zip_file}: {e}'
        print(f'    Exception during extraction: {e}')
        print(f'              Skipping this file and continuing with next...')
        error(error_msg)
        return False  # Indicate failure



# Integrated from original: get_files_date_marker()
def get_files_date_marker(_dir):
    '''     Date from filename for files in director        '''
    pattern_dir = './' + _dir + '/'
    files_with_date = glob.glob(pattern_dir + '*-*')
    if not files_with_date:
        return('')

    files_with_date = files_with_date[0]    # any will do, thus first in list returned (even if just one of course)
    file_date_part = date_from_filename(files_with_date)

    if file_date_part:
        return file_date_part
    else:
        print('    EMPTY in get_files_date_marker')
        return



# Integrated from original: set_files_date_marker()
def set_files_date_marker(_dir, filename):
    '''    For visual clarity create an empty file in dir with filename as the particular date         '''
    file_date_part = date_from_filename(filename)
    pattern_dir    = _dir + '/'
    if file_date_part:
        files_date_marker = pattern_dir + file_date_part
        if os.path.exists(files_date_marker):
            return
        else:
            print(f'    Creating in {pattern_dir} date marker file {file_date_part}')
            open(files_date_marker, 'a').close()
    else:
        print(f'    FAILED creating in {pattern_dir} date marker file {file_date_part}')
        return('EMPTY IN set_files_date_marker, must fix if hit')



# Integrated from original: date_from_filename()
def date_from_filename(filename):
    '''    Pull just date portion of a filename         '''
    return re.sub(r'.*(\d{4}\-\d{2}\-\d{2}).*', r'\1', filename)



# Integrated from original: set_columns_order()
def set_columns_order(df):
    if 'gapfill' in df.columns: return df    # skip stats.csv

    # EXACT match to vaers-orig.py for consistent output column order and layout
    # with reference Gary outputs (so first-N by cell_edits, header positions,
    # and FINAL_MERGED physical layout are directly comparable).
    cols_order = ['cell_edits', 'status', 'changes', 'AGE_YRS', 'SEX', 'STATE', 'DIED',
                  'SPLTTYPE', 'SYMPTOM_TEXT',
                  'L_THREAT', 'ER_VISIT', 'ER_ED_VISIT', 'HOSPITAL', 'DISABLE', 'BIRTH_DEFECT', 'OFC_VISIT',
                  'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE', 'VAX_DATE', 'DATEDIED'
    ]

    if use_test_cases:
        cols_order = ['FORM_VERS'] + cols_order

    cols_order = cols_order[::-1]   # reversed to then appear as shown here

    for col in cols_order:
        if col in df:
            df = move_column_forward(df, col)

    return df



# Integrated from original: save_multi_csv()
def save_multi_csv(date_currently, df):

    if len(df) <= 1048576:
        print(f'Multiple csv save not necessary, output of {len(df)} rows to csv are within single sheet limit')
        return
    df = df.copy()

    df = set_columns_order(df)

    df_a = df.head(1048575)
    df_b = pd.DataFrame()
    if len(df) > 1048576:
        df_b = df.loc[ ~df.VAERS_ID.isin(df_a.VAERS_ID) ]

    filename_a = f'{dir_compared}/{date_currently}_VAERS_FLATFILE_A.csv'
    filename_b = f'{dir_compared}/{date_currently}_VAERS_FLATFILE_B.csv'

    print(f'Saving {filename_a}, {len(df_a)} rows')

    write_to_csv(df_a, filename_a, open_file=1) 

    if len(df_b):
        print(f'   and {filename_b}, {len(df_b)} rows')
        write_to_csv(df_b, filename_b, open_file=0)

    ### Currently broken, writes file but is corrupted
    ###save_xlsx(df, 'XLSX_VAERS_FLATFILE.xlsx')



# Integrated from original: save_xlsx()
def save_xlsx(df, filename):
    pd.io.formats.excel.ExcelFormatter.header_style = None
    df = df.copy()
    df = df.reset_index(drop=True)

    # Currently broken, writes file but is corrupted
    # what a mess ...

    '''
    from unidecode import unidecode
    def FormatString(s):
        if isinstance(s, unicode):
          try:
            s.encode('ascii')
            return s
          except:
            return unidecode(s)
        else:
          return s
    df = df.map(FormatString)
    '''

    print('    Unicode decode')
    df = df.map(lambda x: x.encode('unicode_escape').decode('utf-8') if isinstance(x, str) else x)


    print(f'    Saving to {filename}')
    df1 = df.head(1048575)
    if len(df) > 1048576:
        df2 = df.tail(len(df) - (1048575 + 1))

    sheets = {
        'First million or so rows': df1,
        'Remaining rows': df2,
    }

    '''
    with pd.ExcelWriter('test.xlsx', engine='openpyxl') as writer:
        df1.to_excel(writer, sheet_name = 'Tab1', index = False)
        df2.to_excel(writer, sheet_name = 'Tab2', index = False)

    '''

    '''
    New problem with openpyxl:
    openpyxl.utils.exceptions.IllegalCharacterError: ... benadryl allergy. Aroun??d 11 pm t ...    '''
    #print('    ILLEGAL_CHARACTERS_RE')
    #ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
    #df = df.map(lambda x: re.sub(ILLEGAL_CHARACTERS_RE, '{}', x) if isinstance(x, str) else x)
    #df = df.map(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x)

    print(f'    Writing {filename}')
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        for sheet in sheets:
            sheets[ sheet ].to_excel(writer, sheet_name = sheet, index = False)

    '''
        writer.book.use_zip64()
        workbook      = writer.book
        #workbook.use_zip64()
        ##workbook.use_zip64()   # due to size threshold hit
        header_styles = workbook.add_format({'font_name': 'Arial', 'font_size': 10, 'bold': False})
        cell_styles   = workbook.add_format({       # was test, now unused, didn't take why? Re-try, not yet tested.
            'font_name' : 'Arial',
            'font_size' : 10,
            'bold'      : False,
        })
        cell_styles = header_styles

        for sheet in sheets:
            sheets[sheet].to_excel(writer, sheet_name=sheet, index=False)
            worksheet = writer.sheets[ sheet ]

            for col_num, value in enumerate(sheets[sheet].columns.values):
                worksheet.write(0, col_num, value, cell_styles)

            worksheet.set_row(0, None, header_styles)
    '''

    return

    # engine options: openpyxl or
    with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
        workbook      = writer.book
        ##workbook.use_zip64()   # due to size threshold hit
        header_styles = workbook.add_format({'font_name': 'Arial', 'font_size': 10, 'bold': False})
        cell_styles   = workbook.add_format({       # was test, now unused, didn't take why? Re-try, not yet tested.
            'font_name' : 'Arial',
            'font_size' : 10,
            'bold'      : False,
        })



        for sheet in sheets:
            sheets[sheet].to_excel(writer, sheet_name=sheet, index=False)
            worksheet = writer.sheets[ sheet ]
            #workbook.use_zip64()   # due to size threshold hit. Here instead??? https://stackoverflow.com/a/48095021/962391 ... FAIL

            for col_num, value in enumerate(sheets[sheet].columns.values):
                worksheet.write(0, col_num, value, cell_styles)

            worksheet.set_row(0, None, header_styles)

        workbook.close() # help?



# Integrated from original: get_flattened()
def get_flattened(previous_or_working, raw_date):
    '''    Flattened file, previous or [newest] working [for compare]
              Depends on the files dictionary having been populated.         '''
    flattened_all_dates = sorted( files['flattened']['date'] )
    if flattened_all_dates:
        if previous_or_working == 'previous':
            candidates_previous = [ x for x in flattened_all_dates if x < raw_date ]
            if candidates_previous:
                return files['flattened']['keyval'][ candidates_previous[-1] ]  # most recent flattened file before this current being worked on, to compare to
            else:
                return('')          # can be first run, no previous flattened file, or first matching date_floor
        elif previous_or_working == 'working':
            if raw_date in files['flattened']['keyval']:
                return files['flattened']['keyval'][raw_date]      #                                   this current being worked on
            else:
                exit(f"    ERROR: Expected {raw_date} in files['flattened']['keyval']")
    else:
        exit(f'\n\n\n\n\t\t\t  ERROR: No flattened files using raw_date {raw_date}, unexpected \n\n\n\n')


    list_index_of_relative_to = files['input']['date'].index(raw_date)
    if list_index_of_relative_to == 0:      # there's no previous in this case, and no point in doing a compare to itself
        print(f'            ERROR: Index 0, no previous available')

    if len(files['flattened']['files']) == 0:
        print(f'\n\n\n\n\t\t\t  ERROR: No flattened files, unexpected \n\n\n\n')
        return('')

    if len(files['flattened']['files']) == 1:
        print(f"\n\n\n\n\t\t\t  ERROR: Only 1 flattened file, unexpected, it is {files['flattened']['files'][0]} \n\n\n\n")
        return('')

    if   previous_or_working == 'previous':
        target = files['flattened']['files'][-2]
    elif previous_or_working == 'working':
        target = files['flattened']['files'][-1]

    return target



# Integrated from original: linux_path()
def linux_path(x):
    return re.sub(r'\\', '/', x)



# Integrated from original: subrange()
def subrange(list_in, _max):
    '''    Input list and for print return up to _max (like 5) at the start and end of that list     '''
    if len(list_in) == 0:    return ''      # i.e. not empty square brackets
    if len(list_in) <= _max: return list_in
    list_in   = sorted(list_in)
    this_many = int(max(min(len(list_in) / 2, _max), 1))
    head      =  f'{list_in[:this_many]}'  if len(list_in) > 1 else ''
    head      = re.sub(r'\]', '', head)
    tail      =  f'{list_in[-this_many:]}' if len(list_in) > 1 else ''
    tail      = re.sub(r'\[', '', tail)
    return(f'{len(list_in):>7} {head} ... {tail}')



# Integrated from original: drop_dupes()
def drop_dupes(df):
    '''       '''
    len_before = len(df)    # No duplicate lines error check
    df = df.copy()
    df = df.drop_duplicates(subset=df.columns, keep='last')     # Should be none. Overwrite of values now done
    if len(df) - len_before:
        print(f'{(len_before - len(df)):>10} duplicates dropped in df_data on VAERS_IDs, SHOULD NOT HAVE HAPPENED, in write_to_csv')

    if len(df) - len_before:
        if not open:    # use of 'open' is for manual debugging, while in code during a run while testing can be overridden with open=1, ignore_dupes=1
            if not ignore_dupes:
                print(f'\n\n\n\n\t\t\t {(len_before - len(df)):>10} write_to_csv() {full_filename} complete duplicates dropped, SHOULD NOT HAPPEN \n\n\n\n')
                exit(f'\t\t\t Duplicates exist, this has to be fixed, exiting at {line()}\n\n\n')



# Integrated from original: nan_alert()
def nan_alert(df_in, col=None):
    '''    nan stands for not-a-number, meaning null, check to see if any are present in the dataframe (df_in)         '''

    if not 'DataFrame' in str(type(df_in)):
        print(f'    nan_alert() expect a DataFrame, got {str(type(df_in))}, skipping')
        return 0

    count_nans = 0

    if not len(df_in):
        count_nans =  0
    elif col is not None:     # if column specified, otherwise entire df_in
        df_partial = df_in.copy()[ ['VAERS_ID', col] ]
        columns_w_nans = df_partial.columns[df_partial.isna().any()].tolist()
        if col in columns_w_nans:
            print(f'ERROR: {col} with Nans UNEXPECTED')
            return 1
    else:
        columns_w_nans = df_in.columns[df_in.isna().any()].tolist()
        if columns_w_nans:
            print(f'\n\n\t  nans in {columns_w_nans}     line {inspect.stack()[1][2]}\n\n')
            try:
                df_with_nans = df_in[df_in.isna().any(axis=1)]  # TODO: Truly just rows with nan? https://stackoverflow.com/questions/43424199/display-rows-with-one-or-more-nan-values-in-pandas-dataframe
                count_nans = len(df_with_nans)
                print(f'        rows {count_nans}')
                ## TODO this, return df_in ... df_in.loc[df_in.VAERS_ID.isin(df_with_nans.VAERS_ID), 'trace' ] += f' . NaN'
            except Exception as e:
                print(df_in)
                error(f"df_in[df_in.isna().any(axis=1)]    {e}")
                error(f'rows with NANS    {inspect.stack()[1][2]}')
                print(f'\n\n\n\n\t\t\t rows with NANS    line {inspect.stack()[1][2]}    \n {df_in} \n\n\n\n')
                count_nans = 999999
    return count_nans



# Integrated from original: lookup()
def lookup(previous_date, date_currently, vid_cols_affected):
    '''     A tool for adhoc on-the-fly debugging while at a breakpoint.
               Show values for just the fields and VAERS_IDs for these compared files in dir_compared    '''
    print()
    print(f'debug lookup() on vids affected')
    files_populate_information()
    df_changes    = pd.DataFrame()
    files_list    = [ files['changes']['keyval'][previous_date], files['changes']['keyval'][date_currently] ]
    vids_all_list = list(vid_cols_affected.keys())
    cols_all      = []
    for k in vid_cols_affected:
        for col in vid_cols_affected[k]:
            if col not in cols_all:
                cols_all.append(col)

    for filename in files_list:
        df_tmp = open_file_to_df(filename, doprint=0).fillna('')     # [vids_all_list]
        df_tmp = df_tmp.loc[ df_tmp.VAERS_ID.isin(vids_all_list) ]
        df_tmp = df_tmp.copy()[['VAERS_ID'] + cols_all]
        df_tmp['date'] = date_from_filename(filename)
        df_changes = pd.concat([df_changes.reset_index(drop=True), df_tmp.reset_index(drop=True) ], ignore_index=True)

    df_changes = fix_date_format(df_changes)
    df_changes = move_column_forward(df_changes, 'date')
    df_changes = df_changes.sort_values(by=['VAERS_ID', 'date'])

    write_to_csv(df_changes, 'lookup_changes.csv', open_file=1)

    print()



# Integrated from original: do_replace()
def do_replace(d, col, tag, this, that):
    '''    Not essential, a utility for testing some prep for harvesting numbers.
              Makes regular expression replacements to remove false positives. Pared way down from its original elsewhere.    '''
    d2 = d.copy()       # ???
    print( f'    do_replace  {col} {tag:>40}  {this:>40}    {that}' )
    d2[col] = d2[col].str.replace(this, that, flags=re.IGNORECASE, regex=True)

    return d2



# Integrated from original: symptoms_file_entries_append_to_symptom_text()
def symptoms_file_entries_append_to_symptom_text(df_symfile):
    '''    Append symptoms entries from SYMPTOMS files to SYMPTOM_TEXT in symptoms column
            Files like 2023VAERSSYMPTOMS.csv     '''
    for col in list(df_symfile.copy().columns):        # limit to only SYMPTOM1 thru SYMPTOM5
        if 'VAERS_ID' in col:
            continue
        if 'SYMPTOM' not in col:
            del df_symfile[col]
            continue
        if 'VERSION' in col:
            del df_symfile[col]
    del col

    '''    Combine the symptoms column values to one string in a column called 'symptom_entries'

            Step 1: Five columns to just one    '''
    print('        Combining symptoms column items. Grouping with delimiters, single row per VAERS_ID ...')
    cols_symfile = sorted(set(df_symfile.columns) - set(['VAERS_ID']))

    print(f'        Appending each symptom in new column called symptom_entries ({NUM_CORES} cores)')
    # Pass DataFrame column-subset chunks to workers so astype(str)+sort run in parallel
    _df_cols = df_symfile[cols_symfile]
    _chunk_sz = max(1, len(_df_cols) // NUM_CORES)
    _df_chunks = [_df_cols.iloc[i:i + _chunk_sz] for i in range(0, len(_df_cols), _chunk_sz)]
    with Pool(min(NUM_CORES, len(_df_chunks))) as pool:
        if SHOW_PROGRESS:
            _joined = list(tqdm(
                pool.imap(_join_symptom_rows_chunk, _df_chunks),
                total=len(_df_chunks), desc="        Joining symptoms", unit="chunk", leave=True
            ))
        else:
            _joined = pool.map(_join_symptom_rows_chunk, _df_chunks)
    df_symfile['symptom_entries'] = [entry for chunk in _joined for entry in chunk]

    '''     Step 2: Multiple VAERS_ID rows to just 1 row each    '''

    df_symfile = df_symfile.reset_index(drop=True)
    df_symfile = df_symfile[['VAERS_ID', 'symptom_entries']]

    print(f'        Grouping by VAERS_ID ({NUM_CORES} cores)')
    _vids_u = df_symfile['VAERS_ID'].unique()
    if NUM_CORES > 1 and len(_vids_u) > NUM_CORES * 50:
        # Assign each unique VAERS_ID a chunk index via modulo, then map to all rows and
        # groupby that index — O(n) vs the old O(n×NUM_CORES) isin() loop.
        _chunk_map = dict(zip(_vids_u, np.arange(len(_vids_u)) % NUM_CORES))
        df_symfile['_chunk'] = df_symfile['VAERS_ID'].map(_chunk_map)
        _sym_chunks = [grp.drop(columns='_chunk').reset_index(drop=True)
                       for _, grp in df_symfile.groupby('_chunk', sort=False)
                       if len(grp)]
        df_symfile.drop(columns='_chunk', inplace=True)
        with Pool(min(NUM_CORES, len(_sym_chunks))) as pool:
            if SHOW_PROGRESS:
                _grp_results = list(tqdm(
                    pool.imap(_groupby_syms_chunk, _sym_chunks),
                    total=len(_sym_chunks), desc="        Grouping VAERS_ID", unit="chunk", leave=True
                ))
            else:
                _grp_results = pool.map(_groupby_syms_chunk, _sym_chunks)
        df_symfile = pd.concat(_grp_results, ignore_index=True)
    else:
        df_symfile = (df_symfile.astype(str)
                      .groupby('VAERS_ID').agg(list)
                      .map('_|_'.join).reset_index())

    print('        Cleaning multiple delimiters due to empty columns')
    df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'\s*_\|_\s*(?:\s*_\|_\s*)+\s*', '_|_', False, regex=True)  # multiples to single (where empty columns)

    '''    _|_Always at beginning and end_|_  ... like a box or cell    '''
    df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'^(?!_\|_)(.*)$', r'_|_\1', False, regex=True)   # _|_ at start of line always
    df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'^(.*)(?!_\|_)$', r'\1_|_', False, regex=True)   # _|_ at   end of line always

    df_long_cells = df_symfile.loc[ df_symfile.symptom_entries.str.len() >= 32720 ]    # "Both . xlsx and . csv files have a limit of 32,767 characters per cell"
    if len(df_long_cells):
        print(f'{len(df_long_cells):>10} over 32720 in length to be truncated')
        df_long_cells['symptom_entries'] = df_long_cells.symptom_entries.str.replace(r'^(.{32720,}).*', r'\1 \[truncated, Excel cell size limit 32,767\]', regex=True)
        df_symfile = copy_column_from_1_to_2_per_vids('symptom_entries', df_long_cells, df_symfile, df_long_cells.VAERS_ID.to_list())  # TODO: This was 'symptoms', was a bug, right?
    del df_long_cells

    return df_symfile



# Integrated from original: make_numeric()
def make_numeric(df_in, col):
    # There's been a lot of struggling with types and in the hands of garyha (me) in 2025 restart, still not all solved.
    if not len(df_in): return df_in
    df_in      = df_in.copy()
    df_in[col] = df_in[col].fillna('')
    df_in[col] = df_in[col].astype(str)

    try:
        df_in[col] = pd.to_numeric(df_in[col])
    except Exception as e:
        print(f'pd.to_numeric Exception {e}')

    # Desperation, number columns usually should be empty when 0, setting to 0.0 during run, empty them later
    df_in.loc[ (df_in[ col ].isna()), col ] = 0.0

    df_in[ col ] = df_in[ col ].astype('float64').round(4)

    if 'nan' in df_in[col].to_list():
        print(f'is_nan in make_numeric()')
    return df_in



# Integrated from original: copy_column_from_1_to_2_per_vids()
def copy_column_from_1_to_2_per_vids(column, df1, df2, vids):   # vids are VAERS_IDs
    for col in df1.columns:
        if col not in df2.columns:
            df2[col] = ''              # Avoid nans adding column empty string if didn't already exist
            print(f'    {col} made empty string in df2')

    df2_sequestered_not_vids = df2.loc[ ~df2.VAERS_ID.isin(vids) ]
    df1   = df1.loc[ df1.VAERS_ID.isin(vids) ]
    df2   = df2.loc[ df2.VAERS_ID.isin(vids) ]

    list_vids_both_only_to_copy = sorted( set(df1.VAERS_ID.to_list()) & set(df2.VAERS_ID.to_list()) )

    df2 = df2.copy()
    df2.loc[df2.VAERS_ID.isin(  list_vids_both_only_to_copy   ), column] = df2['VAERS_ID'].map(
        pd.Series(df1[column].values, index=df1.VAERS_ID).to_dict()
    )

    if len(df2_sequestered_not_vids):
        # Add back any that were in 2 but not in vids
        df2 = pd.concat([
            df2.reset_index(drop=True),
            df2_sequestered_not_vids.reset_index(drop=True),
        ], ignore_index=True)

    return df2


    # To be untouched
    df1_sequestered_not_vids = df1.loc[ ~df1.VAERS_ID.isin(vids) ]
    df2_sequestered_not_vids = df2.loc[ ~df2.VAERS_ID.isin(vids) ]
    df1   = df1.loc[ df1.VAERS_ID.isin(vids) ]
    df2   = df2.loc[ df2.VAERS_ID.isin(vids) ]

    # df1 and df2 are now only those that both have `vids` but this has become a mess, TODO

    # Only apply to common rows
    df1_not_in_df2   = df1.loc[ ~df1.VAERS_ID.isin(df2.VAERS_ID) ]
    df2_not_in_df1   = df2.loc[ ~df2.VAERS_ID.isin(df1.VAERS_ID) ]

    list_vids_both_only_to_copy = sorted( set(df1.VAERS_ID.to_list()) & set(df2.VAERS_ID.to_list()) )

    df1_common_subset = df1.loc[  df1.VAERS_ID.isin(list_vids_both_only_to_copy) ]
    df2_common_subset = df2.loc[  df2.VAERS_ID.isin(list_vids_both_only_to_copy) ]

    # Make the changes
    df2_common_subset = df2_common_subset.copy()

    print(f'copy_column {column} on {len(list_vids_both_only_to_copy)} common vids. {len(df1_not_in_df2)} extra in df1, {len(df2_not_in_df1)} extra in df2')

    df2_common_subset.loc[df2_common_subset.VAERS_ID.isin(  list_vids_both_only_to_copy   ), column] = df2_common_subset['VAERS_ID'].map(
        pd.Series(df1_common_subset[column].values, index=df1_common_subset.VAERS_ID).to_dict()
    )

    if len(df1_not_in_df2) or len(df2_not_in_df1):
        # Add back any that were in 2 but not in 1
        df2 = pd.concat([
            df1_not_in_df2.reset_index(drop=True),
            df2_common_subset.reset_index(drop=True),
            df2_not_in_df1.reset_index(drop=True),
            df2_sequestered_not_vids.reset_index(drop=True),
        ], ignore_index=True)

    return df2



# Integrated from original: move_rows()
def move_rows(df_subset, df_move_from, df_move_to):
    '''    Move rows in df_subset out of df_move_from into df_move_to
           Return the new df_move_from and df_move_to    '''
    if not len(df_subset) or not len(df_move_from):
        return df_move_from, df_move_to
    df_move_to   = df_move_to.copy()
    '''
    <string>:1: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. 
        In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. 
        To retain the old behavior, exclude the relevant entries before the concat operation.
    '''
    if not len(df_move_to):             # avoiding that warning above
        df_move_to = df_subset.copy()
    else:
        df_move_to   = pd.concat([df_move_to.reset_index(drop=True), df_subset.reset_index(drop=True) ], ignore_index=True)
    df_move_from = df_move_from.loc[ ~df_move_from.VAERS_ID.isin( df_move_to.VAERS_ID ) ]  # everything from before, not now in done
    return df_move_from, df_move_to



# Integrated from original: move_column_forward()
def move_column_forward(df_in, column):
    '''    Reorder columns moving column to second         '''
    if column not in df_in: return df_in
    columns_pre = list(df_in.columns)
    if columns_pre[1] == column: return df_in  # already in that spot

    if 'VAERS_ID' not in df_in.columns:
        col_order = [column]   # start
        for col in df_in.columns:
            if col == column: continue
            col_order.append(col)
        return df_in[col_order]  #df_in.reindex(col_order, axis=1)

    col_order = ['VAERS_ID', column]
    for c in columns_pre:
        if c not in col_order:
            col_order.append(c)
    return df_in.reindex(col_order, axis=1)



# Integrated from original: check_dupe_vaers_id()
def check_dupe_vaers_id(df):
    if df is None or (not len(df)):
        return 0
    if not 'VAERS_ID' in df.columns:
        return 0
    dupes_list = [ x for x,count in Counter(df.VAERS_ID).items() if count > 1 ]
    if dupes_list:
        print(f'\n\nline {inspect.stack()[ 1 ][ 2 ]}  WARNING: {len(dupes_list)}  D U P L I C A T E  VAERS_IDs:  {subrange(dupes_list, 6)}\n\n')
        return 1
    return 0



# Integrated from original: prv_new_error_check()
def prv_new_error_check(df_prv, df_new):
    '''    Ensuring the VAERS_IDs are identical in each    '''
    vids_in_prv   = df_prv.VAERS_ID.to_list()
    vids_in_new   = df_new.VAERS_ID.to_list()

    if sorted(set(vids_in_prv)) != sorted(set(vids_in_new)):
        print() ; print(f'    ERROR: Expected VAERS_ID in prv and new to match exactly')
        print(f'              len prv: {len(vids_in_prv)}')
        print(f'              len new: {len(vids_in_new)}')
        print(f'           Difference: {len(vids_in_new) - len(vids_in_prv)}')
        vids_in_prv_not_in_new = df_prv.loc[ ~df_prv.VAERS_ID.isin(df_new.VAERS_ID) ].VAERS_ID.to_list()
        vids_in_new_not_in_prv = df_new.loc[ ~df_new.VAERS_ID.isin(df_prv.VAERS_ID) ].VAERS_ID.to_list()
        print(f'    In new not in prv: {vids_in_new_not_in_prv}')
        print(f'    In prv not in new: {vids_in_prv_not_in_new}')
        print()
        return 1
    elif sorted(vids_in_prv) != sorted(vids_in_new):  # as lists
        print() ; print(f'    ERROR: VAERS_ID in prv are the same sets but there are multiple VAERS_ID in one')
        print(f'              len prv: {len(vids_in_prv)}')
        print(f'              len new: {len(vids_in_new)}')
        print(f'           Difference: {len(vids_in_new) - len(vids_in_prv)}')
        list_multiples_prv = [k[0] for k,v in df_prv[ ['VAERS_ID'] ].value_counts().to_dict().items() if v > 1]
        if list_multiples_prv:
            print(f'              VAERS_IDs: {list_multiples_prv}')
            print(f"              df_prv.loc[ df_prv.VAERS_ID.isin({list_multiples_prv}) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]")
            df_multiples_prv = df_prv.loc[df_prv.VAERS_ID.isin(list_multiples_prv)][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]
            print(f'{df_multiples_prv}')
        list_multiples_new = [k[0] for k,v in df_new[ ['VAERS_ID'] ].value_counts().to_dict().items() if v > 1]
        if list_multiples_new:
            print(f'              VAERS_IDs: {list_multiples_new}')
            print(f"df_new.loc[ df_new.VAERS_ID.isin({list_multiples_new}) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]")
            df_multiples_new = df_new.loc[ df_new.VAERS_ID.isin(list_multiples_new) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]
            print(f'{df_multiples_new}')
        exit('              Cannot continue with this discrepancy')



# Integrated from original: debug_breakpoint()
def debug_breakpoint( date, col, vids, debug_dates, debug_cols, debug_vids ):
    debug_pause = 0
    if date in debug_dates or not debug_dates:
        debug_pause += 1
    if col  in debug_cols  or not debug_cols:
        debug_pause += 1
    if debug_vids:
        vid_pause=0
        for dv in debug_vids:
            if dv in vids:
                vid_pause = 1
                break
        if vid_pause:
            debug_pause += 1
    if debug_pause >= 3:
        return 1         # a line for setting a breakpoint if conditions match
    return 0



# Integrated from original: diff_context()
def diff_context( prv, new, col, vid, date_currently ):
    '''    Inputs: String column values like in SYMPTOM_TEXT. Code reviewer: Is there a better way below? Surely.   '''
    delim = ''
    if len(prv) > 200:
        delim_count, delim = get_prevalent_delimiters(prv)
    a = diff_simplify( prv )
    b = diff_simplify( new )

    ''' Ignore punctuation and upper/lower case. [^a-zA-Z0-9|] clears everything not alphanumeric or pipe symbol (since VAX delim matters there)    '''
    if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()):
        return( '', '' )

    if (delim and delim_count >= 5) or col == 'symptom_entries':
        if col == 'symptom_entries':   # these are lightning fast
            delim = '_|_'
        list_prv = a.split(delim)
        list_new = b.split(delim)
        only_prv = [x for x in list_prv if x not in list_new]   # remove common
        only_new = [x for x in list_new if x not in list_prv]
        a = delim.join(only_prv)
        b = delim.join(only_new)
        '''   Neutralizing punctuation changes needed here too because like 1334938 comma to pipe ... Hallucination| auditory ... wacky stuff by them      '''
        if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()):
            return( '', '' )    # Ignore punctuation and upper/lower case
        if delim == '_|_':
            if sorted(only_prv) == sorted(only_new):
                return ('', '')     # Ignore reversed order
            if a: a = delim + a + delim   # endings
            if b: b = delim + b + delim
            return( a, b )
    context        = 6      # way low for cases like 1589954 to rid the word 'appropriate' by itself at len(largest_common_string) > context
    count_while    = 0
    continue_while = 1
    #length_max     = 50  deprecated
    while count_while <= 9 and continue_while:   # Whiddling strings down, removing common substrings
        count_while += 1   # Also consider if a in b: re.sub(a, '', b) ; a = '' and reverse for shorter no context
        if not a or not b:
            return( a, b )
        if a.lower() == b.lower():
            return( '', '' )
        '''    Important: If changes are the same with words just in a different order, skipping those.
                  Why? There are so many they would drown out changes. Applies for example to dose order changes, they are legion.     '''
        if delim:
            if delim == '|' and sorted(set(a.split('|'))) == sorted(set(b.split('|'))):    # vax fields, their delimiter is the pipe symbol, also removing duplicates
                return( '', '' )
            elif delim != '. ' and sorted(set(a.split(' '))) == sorted(set(b.split(' '))):    # ignore reversed order and repeat words
                return( '', '' )
        '''    To remove punctuation crudely:  from string import punctuation ;  ' '.join(filter(None, (word.strip(punctuation) for word in a.split()))) https://stackoverflow.com/a/15740656/962391
                    Ok, I give up. To avoid boatloads of problems, like commas replaced with pipe symbols in SYMPTOM_TEXT, going to compare/return just bare words for long strings
                    Avoid affecting things like 48.0
                    `punctuation` is just a string ... '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
                    punctuations =                     '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'  ... removing the single quote, note the 's', plural, variable name.              '''
        if len(a) > context and len(b) > context:
            a = ' '.join(filter(None, (word.strip(''.join(punctuation)) for word in a.split())))  # consider not stripping ' .. ' instead but dicey
            b = ' '.join(filter(None, (word.strip(''.join(punctuation)) for word in b.split())))
        largest_string = longest_word_string(a, b)
        if largest_string and (len(largest_string) > context):  # and ((len(a) > length_max) or (len(b) > length_max)):
            a = re.sub(re.escape(largest_string), ' .. ', a, flags=re.IGNORECASE)
            b = re.sub(re.escape(largest_string), ' .. ', b, flags=re.IGNORECASE)
            if a.lower() == b.lower():
                return ('', '')

            a_list = [x.strip() for x in a.split()] ; b_list = [x.strip() for x in b.split()]

            only_a = [x for x in a_list if x not in b_list]
            only_b = [x for x in b_list if x not in a_list]
            a_list = [] ; b_list = []
            for x in only_a:
                if x in a_list:
                    continue
                a_list.append(x)
            for x in only_b:
                if x in b_list:
                    continue
                b_list.append(x)
            a = ' '.join(a_list) ; b = ' '.join(b_list)

            if not a or not b:
                return (a, b)
        else:
            continue_while = 0

    a = re.sub(r'\s\s+', ' ', a) ; b = re.sub(r'\s\s+', ' ', b)     # multiple spaces
    a = a.strip()                ; b = b.strip()                    # front and end spaces

    # remove multiples like ' cms hcc ..  cms hcc ..  cms hcc ..  cms hcc '
    if ' .. ' in a  or  ' .. ' in b:
        a_list = a.split('..') ; b_list = b.split('..')  # not a complete job on 1371552 but good enough for now
        a_list = [x.strip() for x in a_list] ; b_list = [x.strip() for x in b_list]
        # drop '' and ' '
        a_list = [x for x in a_list if re.search(r'\w', x)] ; b_list = [x for x in b_list if re.search(r'\w', x)]
        if len(a_list) != len(set(a_list)) or len(b_list) != len(set(b_list)):  # if multiples
            only_a = [x for x in a_list if x not in b_list]
            only_b = [x for x in b_list if x not in a_list]
            a_list = [] ; b_list = []
            for x in only_a:
                if x in a_list:
                    continue
                a_list.append(x)
            for x in only_b:
                if x in b_list:
                    continue
                b_list.append(x)
        a = ' .. '.join(a_list) ; b = ' .. '.join(b_list)
        a = re.sub(r'\s\s+', ' ', a) ; b = re.sub(r'\s\s+', ' ', b)  # necessary anymore after additions above?

    if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()):    # again
        return( '', '' )    # Ignore punctuation and upper/lower case

    return ( a, b )



# Integrated from original: diff_simplify()
def diff_simplify( x ):
    '''     Remove trivial stuff from string, etc        '''
    x = re.sub(r' `\^` '       , ''     , x)    # the repeat sentence replacements that were entered
    x = re.sub(r'[^\x00-\x7F]+', ' '    , x)    # unicode
    x = re.sub(r'\s\s+'        , ' '    , x)    # multiple spaces with single to avoid trivial
    x = re.sub(r"''"           , "'"    , x)    # doubled can mess things up like ... I''m ... in 1455992
    x = x.strip()                               # front and end spaces
    return x



# Integrated from original: get_prevalent_delimiters()
def get_prevalent_delimiters(prv_str):
    '''             '''
    max_delim = 0
    delim = ''
    for d in [  '. ',   '|',   '; ',   ' - '  ]:
        if d == '|' and '_|_' in prv_str:
            continue    # clumsy but these get crossed and '_|_' handled elsewhere
        count = prv_str.count(d)
        if count > max_delim:
            max_delim = count
            if max_delim >= 5:
                delim = d
    return( max_delim, delim )



# Integrated from original: longest_word_string()
def longest_word_string(a, b):
    '''    https://stackoverflow.com/a/42882629/962391         '''
    answer = []
    list_a = a.split()
    list_b = b.split()
    len1, len2 = len(list_a), len(list_b)
    for i in range(len1):
        for j in range(len2):
            lcs_temp = 0
            match = []
            while ((i+lcs_temp < len1) and (j+lcs_temp < len2) and list_a[i+lcs_temp].lower() == list_b[j+lcs_temp].lower()):
                match.append( list_b[j+lcs_temp].lower() )
                lcs_temp += 1
            if len(match) > len(answer):
                answer = match
    answer = ' '.join(answer)
    answer = answer.strip()
    return answer



# Integrated from original: find_all_context()
def find_all_context(search_string, df_in, df_in_column, df_out_column):
    search_string       = search_string          .lower()
    df_in[df_in_column] = df_in[df_in_column].str.lower()
    df_found            = df_in.loc[ df_in[df_in_column].str.contains(search_string, na=False) ]
    if len(df_found):
        df_found    = df_found.copy()
        str_context = r'\b(.{0,20}' + search_string + r'.{0,20})\b'
        df_found[df_out_column] = df_found[df_in_column].str.findall(str_context).str.join(' ~~ ')
        return df_found
    else:
        return pd.DataFrame()  # empty for len 0



# Integrated from original: do_autodownload()
def do_autodownload():
    '''     https://vaers.hhs.gov/data/datasets.html    '''
    if not autodownload:
        return

    sys.path.insert(0, './vaers_downloader')
    # enable when the time comes #  from VAERSFileDownloader import updateVAERSFiles

    updateVAERSFiles(
        needsUpdate = True,
        years       = [2020],
        workingDirectory = os.getcwd()
    )


# =============================================================================
# TOP-LEVEL WORKER FUNCTIONS (must be module-level for multiprocessing.Pool)
# =============================================================================

def _read_file_for_pool(filename_use_chunks):
    """Worker for parallel CSV reading."""
    filename, use_chunks = filename_use_chunks
    return open_file_to_df(filename, doprint=0, use_chunks=use_chunks)


def _dedupe_single_for_pool(vid_content):
    """Stateless symptom-dedup worker. Returns (vid, new_content, count, total_bytes).
    vaers36 data-fidelity hardening: on any error path or empty result when input had
    content, return the original content so we never silently drop SYMPTOM_TEXT etc.
    (The map+results assembly in the caller also has an explicit fallback.)
    """
    vid, content = vid_content
    vid = int(vid)
    orig_content = str(content)
    content = orig_content
    content = re.sub(r'^"', '', content)
    content = re.sub(r'"$', '', content)

    try:
        delim = get_prevalent_delimiters(content)[1]
        if not delim:
            return vid, orig_content, 0, 0

        list_in = re.split(re.escape(delim), content)
        if len(list_in) == len(set(list_in)):
            return vid, orig_content, 0, 0

        count_of_replacements = 0
        total_bytes = 0
        list_out = []
        for line in list_in:
            if line in list_out and len(line) > 40:
                count_of_replacements += 1
                total_bytes += len(line)
                line = '`^`'
            list_out.append(line)

        cleaned = delim.join(list_out)
        # Never return empty string if we started with content
        if not cleaned and orig_content:
            return vid, orig_content, 0, 0
        return vid, cleaned, count_of_replacements, total_bytes
    except Exception:
        # On any failure (huge text, weird delimiters after restore, regex, etc.)
        # preserve the original so the final output does not lose the report text.
        return vid, orig_content if orig_content else '', 0, 0


def _join_symptom_rows_chunk(df_chunk):
    """Worker: astype(str) + sort + join one DataFrame column-subset chunk into '_|_' rows.
    Receiving a DataFrame (not a pre-sorted array) lets the expensive astype+sort run in
    parallel across all worker processes instead of serially in the main process."""
    arr = np.sort(df_chunk.values.astype(str), axis=1)
    return ['_|_'.join(row) for row in arr]


def _agg_vax_chunk(chunk_df):
    """Worker: groupby-aggregate and join a VAERS_ID slice of the vax dataframe.
    Joining here avoids a second single-core map pass after pd.concat."""
    return chunk_df.groupby('VAERS_ID', sort=False).agg(list).map('|'.join)


def _merge_parallel(left, right, on='VAERS_ID', how='left'):
    """Split left DataFrame across NUM_CORES threads and merge each chunk with right.
    Threads share memory so right is never pickled — safe for large DataFrames."""
    if NUM_CORES <= 1 or len(left) < NUM_CORES * 500:
        return pd.merge(left, right, on=on, how=how)
    splits = np.array_split(left, NUM_CORES)
    with ThreadPoolExecutor(max_workers=NUM_CORES) as ex:
        chunks = list(ex.map(lambda chunk: pd.merge(chunk, right, on=on, how=how), splits))
    return pd.concat(chunks, ignore_index=True)


def _groupby_syms_chunk(df_chunk):
    """Reduce symptom rows: groupby VAERS_ID and join all values with _|_ delimiter.
    Runs in a worker process so df_chunk must contain only rows for a disjoint set
    of VAERS_IDs (guaranteed by the caller's array_split on unique IDs)."""
    return (df_chunk[['VAERS_ID', 'symptom_entries']]
            .astype(str)
            .groupby('VAERS_ID')
            .agg(list)
            .map('_|_'.join)
            .reset_index())


def _serialize_csv_chunk(chunk):
    """Serialize one DataFrame chunk to a CSV string (no header, no index)."""
    return chunk.to_csv(index=False, header=False)


# Module-level global for fork-based parallel CSV serialization.
# Set in write_to_csv() before creating the Pool; forked workers inherit via CoW.
_g_df_to_serialize = None

def _serialize_csv_slice(idx_pair):
    """Serialize one row-range slice of the fork-inherited global DataFrame.
    Receives only a tiny (start, stop) tuple over IPC — the large DataFrame is
    never pickled because forked worker processes inherit it from the parent."""
    start, stop = idx_pair
    return _g_df_to_serialize.iloc[start:stop].to_csv(index=False, header=False)


# Globals for parallel astype(str)+fillna and element-wise row comparison in compare().
_g_df_pair:  list = []   # [df_prv, df_new] — set before fork, inherited by workers
_g_pb_cmp         = None  # prv DataFrame slice for row comparison
_g_nb_cmp         = None  # new DataFrame slice for row comparison

def _astype_str_fillna_pair_chunk(args):
    """Convert a column-subset of one of two fork-inherited DataFrames to str+fillna.
    args = (df_index: 0 or 1, col_chunk: list[str])"""
    df_idx, col_chunk = args
    return _g_df_pair[df_idx][col_chunk].astype(str).fillna('')

def _compare_rows_col_chunk(col_chunk):
    """Return a boolean numpy array: True where every column in chunk is equal row-wise."""
    return (_g_pb_cmp[col_chunk] == _g_nb_cmp[col_chunk]).all(axis=1).to_numpy()


def _pool_astype_str(df0, df1, cols1=None):
    """Convert two DataFrames to str+fillna using a forked Pool (true multi-core).
    Workers inherit both DataFrames via CoW; only small column-subset results are
    pickled back.  cols1: restrict df1 to these columns (e.g. ['VAERS_ID', 'symptom_entries']).
    Returns (df0_str, df1_str)."""
    global _g_df_pair
    _g_df_pair = [df0, df1]
    c0 = df0.columns.tolist()
    c1 = cols1 if cols1 is not None else df1.columns.tolist()
    n0 = min(NUM_CORES, len(c0))
    n1 = min(NUM_CORES, len(c1))
    tasks = [(0, list(ch)) for ch in np.array_split(c0, n0)] + \
            [(1, list(ch)) for ch in np.array_split(c1, n1)]
    with Pool(min(NUM_CORES, len(tasks))) as pool:
        res = pool.map(_astype_str_fillna_pair_chunk, tasks)
    _g_df_pair = []
    return pd.concat(res[:n0], axis=1), pd.concat(res[n0:], axis=1)


# Module-level globals used by _build_col_arg_global.
# Set in compare() before forking the build-pool; forked workers inherit via CoW.
_g_prv_arrs:      dict = {}
_g_new_arrs:      dict = {}
_g_ids_np_global        = None
_g_col_date             = None
_g_col_test             = None

def _build_col_arg_global(col):
    """Extract changed rows for one column using fork-inherited globals (no IPC pickling).
    Returns (col, ids_np, prv_arr, new_arr, date, test) — all numpy arrays."""
    prv_arr  = _g_prv_arrs[col]
    new_arr  = _g_new_arrs[col]
    ids_np   = _g_ids_np_global
    both_nan = pd.isnull(prv_arr) & pd.isnull(new_arr)
    chg_mask = (prv_arr != new_arr) & ~both_nan
    ids_chg  = ids_np[chg_mask]
    return (col, ids_chg, prv_arr[chg_mask], new_arr[chg_mask], _g_col_date, _g_col_test)


def _compare_col_worker(args):
    """
    Process one column's changes.
    Returns (col, row_updates, bulk_blank, col_stats, print_lines)
      row_updates  : list of (vid, edits_add, changes_note, col_val_or_None)
      bulk_blank   : None | {'vids': [...], 'prv_vals': {vid: val}, 'cells_emptied': int,
                              'message': str, 'col': str, 'date': str}
      col_stats    : {'cells_edited': int, 'cells_emptied': int,
                      'trivial_changes_ignored': int, 'col_count': int}
      print_lines  : list of strings
    """
    (col, ids_np, prv_arr, new_arr, date_currently, use_test_cases_flag) = args

    row_updates  = []
    bulk_blank   = None
    col_stats    = {'cells_edited': 0, 'cells_emptied': 0,
                    'trivial_changes_ignored': 0, 'col_count': 0}
    print_lines  = []

    if use_test_cases_flag and col == 'FORM_VERS':
        return col, row_updates, bulk_blank, col_stats, print_lines

    # Rebuild minimal DataFrames for this column from pre-filtered numpy arrays
    col_prv = col + '_prv'
    col_new = col + '_new'

    df_slice_prv = pd.DataFrame({'VAERS_ID': ids_np, col: prv_arr}).sort_values('VAERS_ID').reset_index(drop=True)
    df_slice_new = pd.DataFrame({'VAERS_ID': ids_np, col: new_arr}).sort_values('VAERS_ID').reset_index(drop=True)

    vids_changed_this_col = df_slice_new.loc[
        df_slice_new[col].ne(df_slice_prv[col])
    ].VAERS_ID.to_list()

    if not vids_changed_this_col:
        return col, row_updates, bulk_blank, col_stats, print_lines

    df_three_columns = pd.merge(
        df_slice_prv.loc[df_slice_prv.VAERS_ID.isin(vids_changed_this_col)],
        df_slice_new.loc[df_slice_new.VAERS_ID.isin(vids_changed_this_col)],
        on='VAERS_ID', suffixes=('_prv', '_new')
    )

    # Bulk-blanking shortcut (200+ rows cleared)
    df_3_made_blank = df_three_columns.loc[
        df_three_columns[col_prv].ne('') & df_three_columns[col_new].eq('')
    ]
    if len(df_3_made_blank) >= 200:
        vids_list = df_3_made_blank.VAERS_ID.to_list()
        prv_vals  = dict(zip(df_3_made_blank.VAERS_ID, df_3_made_blank[col_prv]))
        total_prv_bytes = df_3_made_blank[col_prv].astype(str).map(len).sum()
        message = (f"{len(df_3_made_blank)} rows with {total_prv_bytes} total bytes MADE BLANK")
        col_padded = (' ' * (20 - len(col))) + col
        print_lines.append(f'    {len(df_3_made_blank):>10} {col} blanked out, noting in bulk, omitting their VAERS_IDs')
        print_lines.append(f'{"":>10} {col_padded} {"":>8} {message:>70} <> [] ')
        col_stats['cells_emptied'] += len(vids_list)
        bulk_blank = {'vids': vids_list, 'prv_vals': prv_vals,
                      'cells_emptied': len(vids_list),
                      'message': message, 'col': col, 'date': date_currently}
        df_three_columns = df_three_columns.loc[~df_three_columns.VAERS_ID.isin(vids_list)]
        if not len(df_three_columns):
            return col, row_updates, bulk_blank, col_stats, print_lines

    # Trivial-change filter
    len_before = len(df_three_columns)
    if 'DATE' in col:
        df_three_columns = df_three_columns.loc[
            df_three_columns[col_prv].astype(str).str.replace(
                r'^(?:0*\d\/\d\d|\d\d\/0*\d|0*\d\/0*\d)\/\d{4}', '', regex=True)
            != df_three_columns[col_new].astype(str).str.replace(
                r'^(?:0*\d\/\d\d|\d\d\/0*\d|0*\d\/0*\d)\/\d{4}', '', regex=True)
        ]
    else:
        df_three_columns = df_three_columns.loc[
            df_three_columns[col_prv].astype(str).str.replace(r'[\W_]', '', regex=True)
            != df_three_columns[col_new].astype(str).str.replace(r'[\W_]', '', regex=True)
        ]

    count_trivial = len_before - len(df_three_columns)
    if count_trivial:
        cellword = 'cell' if count_trivial == 1 else 'cells'
        col_padded = (' ' * (20 - len(col))) + col
        print_lines.append(f'{col_padded} {count_trivial:>7} {count_trivial} {cellword} of trivial non-letter differences ignored')
        col_stats['trivial_changes_ignored'] += count_trivial
    if not len(df_three_columns):
        return col, row_updates, bulk_blank, col_stats, print_lines

    df_three_columns = df_three_columns.fillna('')
    try:
        df_uniq = df_three_columns.groupby([col_prv, col_new])[
            ['VAERS_ID', col_prv, col_new]
        ].transform(lambda x: ' '.join(x.astype(str).unique()))
    except Exception:
        return col, row_updates, bulk_blank, col_stats, print_lines

    df_uniq = df_uniq.drop_duplicates(subset=df_uniq.columns).copy()
    df_uniq['VAERS_IDs'] = df_uniq.VAERS_ID.str.split(' ')
    df_uniq = df_uniq[['VAERS_IDs', col_prv, col_new]]
    df_uniq['len'] = df_uniq[col_new].apply(len)
    df_uniq = df_uniq.sort_values(by=list(set(df_uniq.columns) - {'VAERS_IDs'}))

    blank_brackets = '[]'
    col_padded     = (' ' * (20 - len(col))) + col

    for _, row in df_uniq.iterrows():
        vids_list      = sorted([int(x) for x in row['VAERS_IDs']])
        vid            = vids_list[0]
        val_prv        = row[col_prv]
        val_new        = row[col_new]
        val_prv_ori    = val_prv
        val_new_ori    = val_new
        newly_cut      = 1 if val_prv and (not val_new) and ('cut_' not in val_prv) else 0
        continuing_cut = val_prv if 'cut_' in val_prv and not val_new else ''
        restored       = ''
        if 'cut_' in val_prv and val_new and val_prv.startswith(val_new):
            restored = ' [restored]'

        if newly_cut:
            col_stats['cells_emptied'] += len(vids_list)

        if col in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']:
            val_new = re.sub(r'\.0$', '', str(val_new))
            val_prv = re.sub(r'\.0$', '', str(val_prv))

        if val_prv and val_new:
            if re.sub(r'\W', '', val_prv) == re.sub(r'\W', '', val_new):
                continue
            val_prv, val_new = diff_context(val_prv, val_new, col, vid, date_currently)
        elif not val_prv and not val_new:
            continue

        if not continuing_cut:
            col_stats['cells_edited'] += len(vids_list)
            col_stats['col_count']    += len(vids_list)

        vids_len_to_print  = len(vids_list) if len(vids_list) > 1 else ''
        vids_list_to_print = [int(x) for x in vids_list] if len(vids_list) > 1 else ''
        vid_to_print       = vid if len(vids_list) == 1 else ''

        val_prv_print = val_prv
        val_new_print = val_new
        try:
            if ' cut_' in val_prv_ori:
                if continuing_cut:
                    val_prv_print = val_prv_ori
                elif restored:
                    val_prv_print = "''"
                val_prv_print = re.sub(r' cut_.*$', '', val_prv_print)
            if not val_new_print and len(val_prv_print) > 100:
                prv_excerpt = re.sub(r'^(.{1,40}.*?)\b.*', r'\1', val_prv_print) + f' ... ~{len(val_prv_print) - 40} more'
                if not continuing_cut:
                    print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print:>8} {prv_excerpt:>70} <> {val_new_print}{restored} {vids_list_to_print}')
            else:
                if not continuing_cut:
                    print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print:>8} {val_prv_print:>70} <> {val_new_print}{restored} {vids_list_to_print}')
        except Exception:
            val_prv_print = re.sub(r'[^\x00-\x7F]+', '_', str(val_prv_print))
            val_new_print = re.sub(r'[^\x00-\x7F]+', '_', str(val_new_print))
            print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print} {val_prv_print:>70} <> {val_new_print}{restored} {vids_list_to_print}')

        for vid in vids_list:
            edits_add        = 0
            val_to_keep      = val_prv if (len(val_prv) and not len(val_new_ori)) else val_new_ori
            the_changes_note = ''

            if newly_cut:
                edits_add        = 1
                val_to_keep      = f'{val_to_keep} cut_{date_currently}   '
                the_changes_note = f'{col} cut_{date_currently}    '
            elif continuing_cut:
                val_to_keep = val_prv_ori
            else:
                if val_new and 'cut_' in val_prv_ori:
                    val_prv_c = re.sub(r' cut_.*$', '', val_prv)
                    if val_prv_c == val_new:
                        val_prv = "''"
                if not val_prv and not val_new:
                    edits_add = 0
                    the_changes_note = ''
                else:
                    if not val_prv: val_prv = '[]'
                    if not val_new: val_new = '[]'
                    edits_add = 1
                    the_changes_note = f'{col} {date_currently}: {val_prv} <> {val_new}    '

            row_updates.append((vid, edits_add, the_changes_note,
                                val_to_keep if val_to_keep else None))

    return col, row_updates, bulk_blank, col_stats, print_lines


# Integrated from original: symptoms_dedupe_repeat_sentences()
def symptoms_dedupe_repeat_sentences(_data):
    '''    Remove repeat sentences using all CPU cores    '''
    print(f'    Repeat sentence removal in SYMPTOM_TEXT ({NUM_CORES} cores)')
    _data = _data.copy()

    if len(_data):
        pairs = list(zip(_data['VAERS_ID'].tolist(), _data['SYMPTOM_TEXT'].tolist()))
        chunksize = max(1, len(pairs) // (NUM_CORES * 4))

        with Pool(NUM_CORES) as pool:
            if SHOW_PROGRESS:
                results = list(tqdm(
                    pool.imap(_dedupe_single_for_pool, pairs, chunksize=chunksize),
                    total=len(pairs), desc="    Deduping SYMPTOM_TEXT", unit="report", leave=True
                ))
            else:
                results = pool.map(_dedupe_single_for_pool, pairs, chunksize=chunksize)

        content_map = {}
        for vid, new_content, count_replacements, total_bytes in results:
            content_map[vid] = new_content
            if count_replacements and vid not in writeups_deduped:
                stats['dedupe_bytes']   += total_bytes
                stats['dedupe_reports'] += 1
                stats['dedupe_count']   += count_replacements
                writeups_deduped[vid]    = 1
                if total_bytes > stats['dedupe_max_bytes']:
                    stats['dedupe_max_bytes'] = total_bytes
                    stats['dedupe_max_vid']   = vid

        # Data-fidelity safety (vaers36 fix for vaers35 bug):
        # The parallel Pool+imap+map assembly for _dedupe_single_for_pool could
        # (for very long texts, certain restored reports, or edge delim cases)
        # result in missing keys or '' in content_map for VIDs that had content.
        # This caused SYMPTOM_TEXT (and potentially other long fields after dedupe)
        # to become blank in the FLATTENED / subsequent FLATFILE / FINAL_MERGED
        # for many high cell_edits / deleted+restored reports — exactly the
        # symptom vs. the reference orig.py outputs.
        # Guard: fall back to the pre-dedupe text for any VID that would go blank.
        pre_sym = _data['SYMPTOM_TEXT'].copy()
        _data['SYMPTOM_TEXT'] = _data['VAERS_ID'].map(content_map)
        blank_after = _data['SYMPTOM_TEXT'].isna() | (_data['SYMPTOM_TEXT'] == '')
        had_content = pre_sym.notna() & (pre_sym != '')
        restore_mask = blank_after & had_content
        if restore_mask.any():
            _data.loc[restore_mask, 'SYMPTOM_TEXT'] = pre_sym[restore_mask]
            print(f"    [vaers36 data-fidelity] Restored SYMPTOM_TEXT for {int(restore_mask.sum())} VIDs that the parallel dedupe map would have blanked.")
        _data['SYMPTOM_TEXT'] = _data['SYMPTOM_TEXT'].fillna('')

        print()
        to_print  = f"{stats['dedupe_count']:>10} SYMPTOM_TEXT field repeat sentences deduped in "
        to_print += f"{stats['dedupe_reports']} reports, max difference {stats['dedupe_max_bytes']} bytes in VAERS_ID {stats['dedupe_max_vid']}"
        print(to_print) ; print()
    return _data



# Integrated from original: symptoms_dedupe_repeat_sentences_each()
def symptoms_dedupe_repeat_sentences_each(series_vid_sym):
    '''     Remove repeat sentences in VAERS SYMPTOM_TEXT fields

                Prints each time a larger change occurs.
                Originated at https://stackoverflow.com/a/40353780/962391           '''

    # Use delim_count, delim = get_prevalent_delimiters(prv)

    # In 2025 ... :1818: FutureWarning: Series.__getitem__ treating keys as positions is deprecated.
    #     In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior).
    #     To access a value by position, use `ser.iloc[pos]`
    vid      = int(series_vid_sym.iloc[0])  # 2025 force int, is this a mistake? TODO.
    content  = str(series_vid_sym.iloc[1])  # sometimes not string
    '''   Some of the input files filled in by various people (at CDC) evidently might have gone through some processing in Excel by them.
             As a result, fields have double quotes injected, they have to be removed or big trouble, takes time but required.     '''
    content = re.sub(r'^"', '', content)  # not certain about these
    content = re.sub(r'"$', '', content)

    delim = get_prevalent_delimiters(content)[1]
    if not delim:
        return content

    list_in    = re.split(re.escape(delim), content)
    unique_set = set(list_in)
    dupe_count = len(list_in) - len(unique_set)
    if not dupe_count:
        return content      # no dupes

    count_of_replacements = 0
    total_bytes = 0
    list_out = []
    for line in list_in:
        #if '`^`' in line:   # previously processed by mistake?
        #    continue
        if line in list_out and (len(line) > 40):   # only dedupe longish lines
            #try:
            #    print(f'    {line}')
            #except Exception as e:
            #    pass    # UnicodeEncodeError: 'charmap' codec can't encode character '\ufffd' in position 4: character maps to <undefined>

            count_of_replacements += 1
            total_bytes += len(line)
            if vid not in writeups_deduped:
                stats['dedupe_bytes'] += len(line)
            line = '`^`'
        list_out.append(line)

    if total_bytes > stats['dedupe_max_bytes']:
        stats['dedupe_max_bytes'] = total_bytes
        stats['dedupe_max_vid'  ] = vid

    content = delim.join(list_out)

    # stats
    if vid not in writeups_deduped:                # count each report only once, first time seen
        stats['dedupe_reports']  += 1              # count of reports that ever had any repeat sentences
        writeups_deduped[vid] = 1                  # for unique vid tracking
        #count_of_replacements = content.count('`^`')
        if count_of_replacements:
            stats['dedupe_count'] += count_of_replacements

    return content  # modified

    # -  -  -  -  -  -  -  -  -  -  -  -  -  -  -

    # BELOW SEEMS TO HAVE FLAWS
    # string_split_multiple_delims = re.split('[:;,\|]\s*', content)  # doesn't preserve them
    list_in = re.split(r'[^a-zA-Z0-9\'\s]', content)    # split on everything except alphanumeric, space etc
    list_in = [x for x in list_in if len(x) >= 40]      # just the longer strings
    dupes_list = [x for x, count in Counter(list_in).items() if count > 1]
    if not dupes_list:
        return content

    # Doing dedupe counts only once per VAERS_ID
    # Replace all but the first with  `^` , low tech solution: https://stackoverflow.com/a/53239962/962391
    total_bytes = 0
    for dup in dupes_list:
        content = content.replace(re.escape(dup), ' `^` ').replace(' `^` ', dup, 1)
        total_bytes += len(dup)
        if vid not in writeups_deduped:
            stats['dedupe_bytes'] += len(dup)

    if total_bytes > stats['dedupe_max_bytes']:
        stats['dedupe_max_bytes'] = total_bytes
        stats['dedupe_max_vid'  ] = vid

    # stats
    if vid not in writeups_deduped:   # count each report only once, first time seen
        stats['dedupe_reports']  += 1     # count of reports that ever had any repeat sentences
        writeups_deduped[vid] = 1                  # for unique vid tracking
        count_of_replacements = content.count(' `^` ')
        if count_of_replacements:
            stats['dedupe_count'] += count_of_replacements

    return content  # modified



# Integrated from original: get_next_date()
def get_next_date():
    '''    Next date has to be the next flattened needed to be done. Compare uses flattened files.      '''

    # TODO: This has changed, there's surely some dead code here now that would never be hit, to be removed. Also find ... cheap trick, this is part of it. Slopppy ...
    dates_input   = sorted( [x for x in files['input'  ]['date'] if x not in files['done']])   # added 'done' in 2025
    dates_changes = sorted(files['changes']['date'])

    if not files['flattened']['keyval']:  # first run
        return dates_input[0]

    # Comparing to inputs (to do)
    if dates_changes:
        high_changes_done = dates_changes[-1]
        changes_next_candidates = [x for x in dates_input if x > high_changes_done]
        if changes_next_candidates:

            date_next = changes_next_candidates[0]

            if date_ceiling and ( date_next > date_ceiling ):
                print(f'\n\n\n\n\t\t\t date_ceiling is set and was reached: date_next {date_next} > {date_ceiling} \n\n\n\n')
                return None

            return date_next
        else:
            '''    Save final output as two, split in half, due to Excel row limit of about 1.048 million per sheet.     '''
            # dead no? if len(df_flat_prv):
            # dead no?     print() ; print('        save_multi_csv(high_changes_done, df_flat_prv)') ; print()
            # dead no?     save_multi_csv(high_changes_done, df_flat_prv)

            if date_ceiling:
                print(f'\n\n\n\n\t\t\t date_ceiling is {date_ceiling}, highest changes: {high_changes_done} \n\n\n\n')
            else:
                print('\n    No more input files to process')
            return None

    else:
        '''   Confusions. Whole thing needs a lot of conditions applied.
                If there are no changes files, can still be consolidated and flattened.
                If no changes, just return the first as next? No, that's a loop.
        '''

        if files['flattened']['date']:
            if dates_input[0] in files['flattened']['date']:
                # Initiating, copy it

                date_currently = dates_input[0]

                print_date_banner(date_currently)

                file_for_copy_original = dir_compared + '/' + date_currently + '_VAERS_FLATFILE.csv'
                print(
                    f'        Due to first drop, creating from flattened: {file_for_copy_original}')
                print()
                file_flattened = files['flattened']['keyval'][date_currently]
                shutil.copy(file_flattened, file_for_copy_original)   # copyfile() doesn't include permissions so it can result in problems

                # The file has to be read for VAERS_IDs for ever_covid to be accurate
                print() ; print(f'    Reading original for ever_covid bookkeeping')
                df = open_file_to_df(file_flattened, doprint=1)

                # Initial row in stats file when only one compare/changes file
                stats_initialize (date_currently)  # per drop, then a totals row is calculated
                do_never_ever( df.VAERS_ID.to_list(), date_currently, 'get_next_date on file_flattened' )
                do_ever_covid( sorted(set(df.VAERS_ID.to_list())) )
                stats_resolve(date_currently)

                return dates_input[1]   # NEXT ONE

        return dates_input[0]

        consolidated_not_done = sorted( set(files['input']['date']) - set(files['consolidated']['date']) )
        if consolidated_not_done:
            date_todo = sorted(consolidated_not_done)[0]
            print(f'    Consolidation to do: {date_todo}')
            return date_todo

        flattened_not_done = sorted( set(files['input']['date']) - set(files['flattened']['date']) )
        if flattened_not_done:
            date_todo = sorted(flattened_not_done)[0]
            print(f'    Flattening to do: {date_todo}')
            return date_todo

        latest_flattened_done = sorted(files['flattened']['date'])[-1] if files['flattened']['date'] else ''
        dates_list = [x for x in dates_input if x > latest_flattened_done]
        date_next  = dates_list[0]

        if len(dates_list) == 0:        # No more to process
            print()
            exit('    No more files to process')
        elif date_ceiling and ( date_next > date_ceiling ):
            exit(f'\n\n\n\n\t\t\t date_ceiling is set and was reached: date_next {date_next} > {date_ceiling} \n\n\n\n')
        else:
            return date_next



# Integrated from original: more_to_do()
def more_to_do():
    '''    Return 1 or 0 on whether any more input files process         '''
    global elapsed_drop, df_flat_1

    files_populate_information()

    if not files['changes']['date']:    # None done yet
        return 1

    if files['input']['date'] == files['flattened']['date'] == files['consolidated']['date'] == files['changes']['date']:
        if files['changes']['date'][-1] >= files['input']['date'][-1]:
            date_currently = files['changes']['date'][-1]
            if df_flat_1 is not None and len(df_flat_1):
                pass
                #save_multi_csv(date_currently, df_flat_1)   # more than one csv due to Excel row limit
            else:
                prv_date     = files['changes']['date'][-1]
                filename     = files['changes']['keyval'][prv_date]
                print(f'Patching for creation of split changes file _A and _B, no more to do case unusual, using {filename}')
                df_flat_1 = open_file_to_df(filename, doprint=0)

            print() ; print('        save_multi_csv(date_currently, df_flat_1)') ; print()
            save_multi_csv(date_currently, df_flat_1)   # more than one csv due to Excel row limit

            print()
            print(f"        No more to do, last set {files['changes']['date'][-1]} >= {files['input']['date'][-1]} done")
            return 0

    elapsed_drop = _time.time()    # new start marker for total time on each drop

    return 1



# Integrated from original: print_date_banner()
def print_date_banner(this_drop_date, file_num=None, total_input_files=None):
    _INV  = '\033[7m'   # reverse-video (invert fg/bg)
    _RST  = '\033[0m'   # reset
    progress_str = ''
    if file_num is not None and total_input_files is not None:
        files_left  = total_input_files - file_num
        pct         = 100 * file_num / total_input_files if total_input_files else 0
        progress_str = (
            f'  {_INV} File {file_num}/{total_input_files} '
            f'— {files_left} left  ({pct:.0f}%) {_RST}'
        )
    print()
    print('= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = ')
    print(f'    Next date {this_drop_date}{progress_str}')
    print('= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = ')
    print()



# Integrated from original: consolidate()
def consolidate(files_date_marker):
    '''
        =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
        Consolidate -- All in one file but still multiple rows per VAERS_ID sometimes.
        =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =        '''
    global df_vax, df_data, df_syms_flat, ever_covid, ever_any, covid_earliest_vaers_id

    print() ; print(f'    Consolidation')

    files_populate_information()

    '''    Consolidate only if the file doesn't already exist    '''
    if files_date_marker in files['consolidated']['date']:
        if files_date_marker in files['consolidated']['keyval']:       # already done, use it
            print(f"        Consolidation already done: {files['consolidated']['keyval'][files_date_marker]}")
            df_vax = pd.DataFrame()     # avoid picking up a previous consolidation when flattening
            return
        else:
            print(f"    ERROR: Expected {files_date_marker} in files['consolidated']['keyval']")

    if files_date_marker in files['flattened']['date']:
        print(f'        Skipping consolidation because flattened for {files_date_marker} already exists')
        return

    '''    Combine all data, vax and symptoms files as one
                (if not already a file like 2020-12-25_VAERS_CONSOLIDATED.csv in which case just do the treament)     '''
    print(f'    Reading VAERS files across {NUM_CORES} cores: *VAERSDATA.csv *VAERSVAX.csv *VAERSSYMPTOMS.csv')
    vax_glob  = glob.glob(dir_working + '/' + '*VAERSVAX.csv')
    syms_glob = glob.glob(dir_working + '/' + '*VAERSSYMPTOMS.csv')
    data_glob = glob.glob(dir_working + '/' + '*VAERSDATA.csv')

    # Read all three file groups with a single Pool so every core is busy
    all_input_files = vax_glob + syms_glob + data_glob
    _workers = min(NUM_CORES, max(1, len(all_input_files)))
    _args = [(f, False) for f in all_input_files]
    with Pool(_workers) as pool:
        if SHOW_PROGRESS:
            _all_dfs = list(tqdm(
                pool.imap(_read_file_for_pool, _args),
                total=len(all_input_files), desc="    Reading VAERS files", unit="file", leave=True
            ))
        else:
            _all_dfs = pool.map(_read_file_for_pool, _args)

    _vax_set = set(vax_glob); _syms_set = set(syms_glob); _data_set = set(data_glob)
    def _concat_group(flist):
        dfs = [df for f, df in zip(all_input_files, _all_dfs)
               if f in flist and df is not None and len(df) > 0]
        return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

    df_vax      = _concat_group(vax_glob)
    df_syms     = _concat_group(syms_glob)
    df_data_all = _concat_group(data_glob)

    '''    Remove all reports prior to the first covid jab. [That was the thinking but now in 2025 with all CDC drops (at last), widen to all vaccines?]
                2020-12-14  902418  First report after public rollout is next day:  https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=902418&WAYBACKHISTORY=ON
                2020-10-02  896636  may have been from a trial:                     https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=896636&WAYBACKHISTORY=ON
                    VAERS ID:       896636
                    VAERS Form:     2
                    Age:            47.0
                    Sex:            Female
                    Location:       South Carolina

                    Vaccinated:    2020-09-28
                    Onset:         2020-10-02
                    Submitted:      0000-00-00
                    Entered:        2020-11-14
                    Vaccination / Manufacturer (1 vaccine)            Lot / Dose        Site / Route
                    COVID19: COVID19 (COVID19 (MODERNA)) / MODERNA    - / UNK            LA / SYR

                2020-12-10  970043  Died same day, reported later:                  https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=970043&WAYBACKHISTORY=ON
    '''

    # Automatically detect earliest covid VAERS_ID. TODO: Do earlier in the code?
    df_vax_all_covid = df_vax.loc[ df_vax[ 'VAX_TYPE' ].str.lower().str.contains('covid', na=False) ]
    if len(df_vax_all_covid):
        covid_earliest_vaers_id_now = df_vax_all_covid.VAERS_ID.astype(int).min()
        if covid_earliest_vaers_id == 0:    # no previous VAERS_ID value
            covid_earliest_vaers_id = covid_earliest_vaers_id_now
            print(f'          Earliest covid VAERS_ID initiated at {covid_earliest_vaers_id}\n')
        elif covid_earliest_vaers_id > covid_earliest_vaers_id_now:
            # was previously set but is even lower now (altho published later)
            print(f'          Earliest covid VAERS_ID set EVEN LOWER now at {covid_earliest_vaers_id}, was {covid_earliest_vaers_id}\n')
            covid_earliest_vaers_id = covid_earliest_vaers_id_now
    else:
        print(f'          No covid found in {files_date_marker}\n')
        # Cheap trick to avoid running the same input again, and this should be improved. Quite the logic puzzle. Trimming files_date_marker from inputs list.
        files[ 'input' ][ 'date' ] = [x for x in files['input'  ]['date'] if x != files_date_marker]
        return

    stats['lo_ever'] = covid_earliest_vaers_id   # TODO: 2025, wrong? right?

    if vids_limit:   # in testing special case
        df_data_all = df_data_all.loc[ df_data_all.VAERS_ID.isin(vids_limit) ]
        df_vax      = df_vax     .loc[ df_vax     .VAERS_ID.isin(vids_limit) ]
        df_syms     = df_syms    .loc[ df_syms    .VAERS_ID.isin(vids_limit) ]

    len_before  = len(df_data_all)
    df_data_all = df_data_all.loc[ df_data_all.VAERS_ID >= covid_earliest_vaers_id ]
    df_vax      = df_vax     .loc[ df_vax     .VAERS_ID >= covid_earliest_vaers_id ]
    df_syms     = df_syms    .loc[ df_syms    .VAERS_ID >= covid_earliest_vaers_id ]
    print() ; print(f'{len_before - len(df_data_all):>10} records removed prior to the first covid report (covid_earliest_vaers_id {covid_earliest_vaers_id})')
    print(f'{len(df_data_all):>10} covid vax reports to work with (unique VAERS_IDs)') ; print()

    vids_list     = df_data_all.VAERS_ID.to_list()
    lo_data_all   = min(vids_list)
    hi_data_all   = max(vids_list)
    diff_data_all = hi_data_all - lo_data_all
    len_data_all  = len(vids_list)
    missing       = diff_data_all - len_data_all
    print(f'{missing:>10} missing (any/all vax never published in covid era) is implied by only {len_data_all} present in {diff_data_all} range with lo {lo_data_all} and hi {hi_data_all}')
    print()

    vids_new = [ x for x in vids_list if x not in ever_any ]
    print(f'{len(vids_new):>10} being added to `ever_any` for this drop, any/all vax')
    ever_any.update({x:1 for x in vids_list})
    #ever_any = {**ever_any, **{x:1 for x in vids_list}}
    #ever_any = {x:1 for x in set(ever_any.keys() + vids_list)}

    # Grab all VAERS_IDs before filtering for covid, to be able to identify gaps properly
    do_never_ever( vids_list, files_date_marker, 'consolidate on df_data_all' )

    stats['drop_input_covid'] = len(df_data_all)

    '''   VAERS_IDs can have multiple records/doses/lots in each report but at least one covid19.    '''
    dfv = df_vax.copy()
    dfv['doses']          = dfv.VAERS_ID.map(dfv.VAERS_ID.value_counts())
    dfv_single_doses      = dfv.loc[ dfv.doses.eq(1) ]
    dfv_singles_w_covid   = dfv_single_doses.loc[  dfv_single_doses.VAERS_ID.isin(dfv_single_doses.loc[ dfv_single_doses.VAX_TYPE.str.contains('COVID', na=False) ].VAERS_ID.to_list())  ].sort_values(by='VAERS_ID')
    dfv_multpl_doses      = dfv.loc[ dfv.doses.ge(2) ]
    dfv_multiples_w_covid = dfv_multpl_doses.loc[  dfv_multpl_doses.VAERS_ID.isin(dfv_multpl_doses.loc[ dfv_multpl_doses.VAX_TYPE.str.contains('COVID', na=False) ].VAERS_ID.to_list())  ].sort_values(by='VAERS_ID')
    del dfv

    dfv_covid_type_both   = pd.concat([dfv_singles_w_covid.reset_index(drop=True), dfv_multiples_w_covid.reset_index(drop=True) ], ignore_index=True)
    dfv_covid_type_both   = dfv_covid_type_both.drop_duplicates()
    df_data_other_for_covid_search = df_data_all.loc[ ~df_data_all.VAERS_ID.isin(dfv_covid_type_both.VAERS_ID) ]

    df_data_covid_other_found1 = df_data_other_for_covid_search.loc[  df_data_other_for_covid_search.SYMPTOM_TEXT.str.contains(r'Pfizer|Moderna|Janssen', re.IGNORECASE, na=False) ]
    df_data_covid_other_found2 = df_data_covid_other_found1    .loc[  df_data_covid_other_found1    .SYMPTOM_TEXT.str.contains(r'Covid', re.IGNORECASE, na=False) ]
    print() ; print(f'{len(df_data_covid_other_found2):>10} additional reports captured with Pfizer|Moderna|Janssen and Covid in SYMPTOM_TEXT although not officially type COVID') ; print()

    vids_vax__covid_type_both   = dfv_covid_type_both       .VAERS_ID.to_list()
    vids_data_covid_other_found = df_data_covid_other_found2.VAERS_ID.to_list()
    vids_all_covid_list         = sorted( set(vids_vax__covid_type_both + vids_data_covid_other_found) )

    # Reducing to only covid reports
    len_data_all_prev_any = len(df_data_all)
    df_data = df_data_all.loc[ df_data_all.VAERS_ID.isin(vids_all_covid_list) ]
    df_vax  = df_vax     .loc[ df_vax     .VAERS_ID.isin(vids_all_covid_list) ]
    df_syms = df_syms    .loc[ df_syms    .VAERS_ID.isin(vids_all_covid_list) ]
    print(f'{               len_data_all_prev_any:>10} total any vax reports') ; print()
    print(f'{len_data_all_prev_any - len(df_data):>10} non-covid vax reports excluded') ; print()

    stats['drop_input_covid'] = len(df_data)

    #do_ever_covid( vids_all_covid_list )    # Only covid reports, subset of ever_any which is all vax

    len_before = len(df_data)
    df_data    = df_data.drop_duplicates(subset='VAERS_ID')
    if len(df_data) - len_before:
        print(f'{(len_before - len(df_data)):>10} duplicates dropped in df_data on VAERS_IDs')
    print(f'{len(set(df_data.VAERS_ID.to_list())):>10} covid reports to work with') ; print()

    '''   Shorten some fields in VAX (the change to title case also indicates I've touched it)      '''
    print('    Shortening some field values in VAX_NAME, VAX_MANU') ; print()

    '''
        VAX_TYPE
        COVID19
        COVID19-2
        FLU4

        VAX_NAME                                        Shorter
        COVID19 (COVID19 (MODERNA))                     C19 Moderna
        COVID19 (COVID19 (MODERNA BIVALENT))            C19 Moderna BIVALENT
        COVID19 (COVID19 (PFIZER-BIONTECH))             C19 Pfizer-BionT
        COVID19 (COVID19 (PFIZER-BIONTECH BIVALENT))    C19 Pfizer-BionT BIVALENT
        COVID19 (COVID19 (JANSSEN))
        COVID19 (COVID19 (UNKNOWN))
        COVID19 (COVID19 (NOVAVAX))
        INFLUENZA (SEASONAL) (FLULAVAL QUADRIVALENT)
        Transitional with parens removed first step
        COVID19 COVID19 MODERNA
        COVID19 COVID19 MODERNA BIVALENT
        COVID19 COVID19 PFIZER-BIONTECH
        COVID19 COVID19 PFIZER-BIONTECH BIVALENT
        COVID19 COVID19 JANSSEN
        INFLUENZA SEASONAL FLULAVAL QUADRIVALENT    '''

    df_vax = df_vax.copy()
    # VAX_NAME and VAX_MANU replacement chains are independent — run them in two threads.
    def _clean_vax_name(s):
        s = s.str.replace(r'[\)\(]'                    , r''             , regex=True)
        s = s.str.replace(r'(?:COVID(\S+)\s)+'         , r'C\1 '         , regex=True)
        s = s.str.replace(r'PFIZER.BION.*'             , r'Pfizer-BionT' , regex=True)
        s = s.str.replace(r'MODERNA'                   , r'Moderna'      , regex=True)
        s = s.str.replace(r'JANSSEN'                   , r'Janssen'      , regex=True)
        s = s.str.replace(r'INFLUENZA'                 , r'Flu'          , regex=True)
        s = s.str.replace(r'VACCINE NOT SPECIFIED'     , r'Not Specified', regex=True)
        return s
    def _clean_vax_manu(s):
        s = s.str.replace(r'UNKNOWN MANUFACTURER'      , r'Unknown'      , regex=True)
        s = s.str.replace(r'PFIZER.BION.*'             , r'Pfizer-BionT' , regex=True)
        s = s.str.replace(r'MODERNA'                   , r'Moderna'      , regex=True)
        s = s.str.replace(r'JANSSEN'                   , r'Janssen'      , regex=True)
        s = s.str.replace(r'PF.*WYETH'                 , r'Pfizer-Wyeth' , regex=True)
        s = s.str.replace(r'NOVARTIS.*'                , r'Novartis'     , regex=True)
        s = s.str.replace(r'SANOFI.*'                  , r'Sanofi'       , regex=True)
        s = s.str.replace(r'MERCK.*'                   , r'Merck'        , regex=True)
        s = s.str.replace(r'PROTEIN.*'                 , r'Protein'      , regex=True)
        s = s.str.replace(r'GLAXO.*'                   , r'Glaxo'        , regex=True)
        s = s.str.replace(r'SEQIRUS.*'                 , r'Seqirus'      , regex=True)
        s = s.str.replace(r'BERNA.*'                   , r'Berna'        , regex=True)
        return s
    with ThreadPoolExecutor(max_workers=2) as ex:
        _fut_name = ex.submit(_clean_vax_name, df_vax['VAX_NAME'].copy())
        _fut_manu = ex.submit(_clean_vax_manu, df_vax['VAX_MANU'].copy())
        df_vax['VAX_NAME'] = _fut_name.result()
        df_vax['VAX_MANU'] = _fut_manu.result()

    # To avoid jumbling in the case of tiny differences between drops, apparently this is necessary why?
    df_vax = df_vax.sort_values(by=['VAERS_ID', 'VAX_LOT', 'VAX_SITE', 'VAX_DOSE_SERIES', 'VAX_TYPE', 'VAX_MANU', 'VAX_ROUTE', 'VAX_NAME'])

    print(f'    Merging DATA into VAX ({NUM_CORES} cores)')
    _vax_s, _data_s = _pool_astype_str(df_vax, df_data)
    df_data_vax = _merge_parallel(_vax_s, _data_s)
    del _vax_s, _data_s
    if nan_alert(df_data_vax):
        pause=1  # for a breakpoint sometimes
        df_data_vax = df_data_vax.fillna('')
    print(f'{len(df_data_vax):>10} rows in df_data_vax (can be more than one row per VAERS_ID)')

    '''
    =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
    symptom_entries
    =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
    '''

    '''   Aggregate symptoms into new column called symptom_entries, and merge for all symptoms covered as a single string      '''
    print('    Aggregating symptoms into symptom_entries string, new column')

    df_syms_flat = symptoms_file_entries_append_to_symptom_text(df_syms)

    '''
    =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
    Consolidation
    Save result into one file like ... 2020-12-25_VAERS_CONSOLIDATED.csv
    More than one row per VAERS_ID for the various doses/lots, thus duplicates to symptom_entries etc.
    =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
    '''

    '''    symptom_entries to consolidated     '''
    print(f'    Merging symptom_entries into df_data_vax ({NUM_CORES} cores)')
    _dvx_s, _sym_s = _pool_astype_str(df_data_vax, df_syms_flat, cols1=['VAERS_ID', 'symptom_entries'])
    df_data_vax_syms_consolidated = _merge_parallel(_dvx_s, _sym_s)
    del _dvx_s, _sym_s
    #nan_alert(df_data_vax_syms_consolidated)   # nan in symptom_entries in 1106891 on 03/10/2021 as the symptoms file has no entry line for it even tho Current Illness: Atrial Fibrillation
    df_data_vax_syms_consolidated = df_data_vax_syms_consolidated.fillna('')
    print(f'{len(df_data_vax_syms_consolidated):>10} rows in df_data_vax_syms_consolidated') # has dupe VAERS_IDs as expected, more than one dose per report

    filename_consolidated = dir_consolidated + '/' + files_date_marker + '_VAERS_CONSOLIDATED.csv'
    print(f'    Saving result into one file w/ multiple rows per VAERS_ID: {filename_consolidated}')
    write_to_csv(df_data_vax_syms_consolidated, filename_consolidated)
    print()

    print(f'    Consolidation of {files_date_marker} done')

    return



# Integrated from original: flatten()
def flatten(files_date_marker):
    '''
        =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =
        Flattening -- Only one row per VAERS_ID
        =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =  =        '''
    global df_vax, df_data, df_syms_flat, df_flat_2

    if not covid_earliest_vaers_id:
        files['done'].append(files_date_marker)
        return

    print()
    print(f'    Flattening')

    files_populate_information()

    '''    Flatten only if the file doesn't already exist    '''
    if files_date_marker in files['flattened']['date']:
        if files_date_marker in files['flattened']['keyval']:       # already done, use it
            print(f"        Flattening already done: {files['flattened']['keyval'][files_date_marker]}") ; print()
            return
        else:
            print(f"    ERROR: Expected {files_date_marker} in files['flattened']['keyval']")

    pull_vax_records  = 0
    file_consolidated = ''
    if not len(df_vax):     # already exists if consolidate just created it
        # vax file records from consolidate file
        if files_date_marker in files['consolidated']['keyval']:
            file_consolidated = files['consolidated']['keyval'][files_date_marker]
            print(f"    Pulling vax records from previously consolidated file {file_consolidated}")
            pull_vax_records = 1
        else:
            print(f"    ERROR: Expected {files_date_marker} in files['consolidated']['keyval']")

    if pull_vax_records:
        df_consolidated = open_file_to_df(file_consolidated, doprint=1).fillna('')
        columns_vax     = ['VAERS_ID', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'VAX_ROUTE', 'VAX_SITE', 'VAX_NAME']      # Note VAX_DATE is in data, not vax file
        columns_consolidated = df_consolidated.columns
        columns_data    = ['VAERS_ID'] + list( set(columns_consolidated) - set(columns_vax) - {'symptom_entries'} )
        # Three independent slices — convert to str in parallel.
        # vaers36: use groupby('VAERS_ID').first() (content-preserving) instead of
        # plain drop_duplicates (order-dependent "first encountered") when collapsing
        # the possibly vax-expanded consolidated rows back to one-per-VID for the
        # report-level data columns (SYMPTOM_TEXT etc). This avoids accidentally
        # picking a blanked row for a VID if row ordering in consolidated varies.
        with ThreadPoolExecutor(max_workers=3) as ex:
            _fv = ex.submit(lambda: df_consolidated[columns_vax].astype(str))
            _fd = ex.submit(lambda: df_consolidated[columns_data].groupby('VAERS_ID', sort=False, as_index=False).first().astype(str))
            _fs = ex.submit(lambda: df_consolidated[['VAERS_ID', 'symptom_entries']].groupby('VAERS_ID', sort=False, as_index=False).first().astype(str))
            df_vax       = _fv.result()
            df_data      = _fd.result()
            df_syms_flat = _fs.result()
        del file_consolidated, pull_vax_records, columns_vax, columns_data

    if not stats['drop_input_covid']:
        stats['drop_input_covid'] = len(df_data)
        print(f"=== Added count_input {stats['drop_input_covid']} in def flattening")

    '''    Aggregate VAX for single VAERS_ID         '''
    print(f'    Aggregate/flatten VAX items. Grouping by VAERS_ID ({NUM_CORES} cores)')

    df_vax = df_vax.reset_index(drop=True)
    _vids_unique = df_vax['VAERS_ID'].unique()
    if NUM_CORES > 1 and len(_vids_unique) > NUM_CORES * 100:
        # map/groupby split: O(n) single pass instead of O(n×NUM_CORES) isin() loop
        _vax_chunk_map = dict(zip(_vids_unique, np.arange(len(_vids_unique)) % NUM_CORES))
        df_vax['_chunk'] = df_vax['VAERS_ID'].map(_vax_chunk_map)
        _vax_chunks = [grp.drop(columns='_chunk').reset_index(drop=True)
                       for _, grp in df_vax.groupby('_chunk', sort=False) if len(grp)]
        df_vax = df_vax.drop(columns='_chunk')
        with Pool(min(NUM_CORES, len(_vax_chunks))) as pool:
            if SHOW_PROGRESS:
                _agg_results = list(tqdm(
                    pool.imap(_agg_vax_chunk, _vax_chunks),
                    total=len(_vax_chunks), desc="    Aggregating VAX", unit="chunk", leave=True
                ))
            else:
                _agg_results = pool.map(_agg_vax_chunk, _vax_chunks)
        df_vax_flat = pd.concat(_agg_results).reset_index()
    else:
        df_vax_flat = df_vax.groupby('VAERS_ID').agg(list).map('|'.join).reset_index()
    del df_vax  # done with this global
    len_before  = len(df_vax_flat)
    df_vax_flat = df_vax_flat.drop_duplicates(subset=df_vax_flat.columns, keep='first')
    if len(df_vax_flat) - len_before:
        print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_vax_flat)):>10} duplicates dropped in df_vax_flat, THERE SHOULD BE NONE\n\n\n\n')
    check_dupe_vaers_id(df_vax_flat) ; print()

    '''    Combine DATA into VAX (has multiple records for each VAERS_ID, when more than one dose is listed)     '''
    print(f'    Merging DATA into VAX flattened ({NUM_CORES} cores)')
    _vax_s, _data_s = _pool_astype_str(df_vax_flat, df_data)
    df_data_vax_flat = _merge_parallel(_vax_s, _data_s)
    del _vax_s, _data_s
    if nan_alert(df_data_vax_flat):
        pause=1
    df_data_vax_flat = df_data_vax_flat.fillna('')
    check_dupe_vaers_id(df_data_vax_flat)

    len_before = len(df_data_vax_flat)
    df_data_vax_flat = df_data_vax_flat.drop_duplicates(subset=df_data_vax_flat.columns, keep='first')
    if len(df_data_vax_flat) - len_before:
        print(f'{(len_before - len(df_data_vax_flat)):>10} duplicates dropped in df_data_vax_flat')
        print(f'{len(df_data_vax_flat):>10} rows in df_data_vax_flat') ; print()

    '''    symptom_entries to flattened     '''
    print(f'    Merging symptom_entries into df_data_vax_syms_flat ({NUM_CORES} cores)')
    _dvf_s, _sym_s = _pool_astype_str(df_data_vax_flat, df_syms_flat, cols1=['VAERS_ID', 'symptom_entries'])
    df_data_vax_syms_flat = _merge_parallel(_dvf_s, _sym_s)
    del _dvf_s, _sym_s
    df_data_vax_syms_flat = df_data_vax_syms_flat.fillna('')    # there are nans like 1106891 with no record for it in ...symptoms.csv
    del df_syms_flat
    len_before = len(df_data_vax_syms_flat)     # TODO: Understand the reason for the dupes
    df_data_vax_syms_flat = df_data_vax_syms_flat.drop_duplicates(subset=df_data_vax_syms_flat.columns, keep='first')
    if len(df_data_vax_syms_flat) - len_before:
        print(f'{(len_before - len(df_data_vax_syms_flat)):>10} duplicates dropped in df_data_vax_syms_flat') ; print()

    '''
        SMALLER SYMPTOM_TEXT, REMOVAL OF REPEAT SENTENCES, DEDUPE
    '''
    # vaers36: the dedupe now has multiple layers of "never blank a VID that had text"
    # (worker try, and caller map + pre-sym fallback). This directly addresses the
    # systematic SYMPTOM_TEXT='' in vaers35.py FINAL outputs for high-edit reports.
    df_data_vax_syms_flat = symptoms_dedupe_repeat_sentences(df_data_vax_syms_flat)

    '''   Save result into one file like ... 2020-12-25_VAERS_FLATTENED.csv      '''
    filename_flattened = dir_flattened + '/' + files_date_marker + '_VAERS_FLATTENED.csv'
    print(f'    Saving result into one file: {filename_flattened}') ; print()
    write_to_csv(df_data_vax_syms_flat, filename_flattened)
    print(f'{len(df_data_vax_syms_flat):>10} rows with unique  VAERS_IDs in {filename_flattened}') ; print()

    print(f'    Flattening of {files_date_marker} done') ; print()

    df_flat_2        = df_data_vax_syms_flat
    dict_done_flag['flat2'] = files_date_marker

    return

    '''
        Some VAERS_ID in 2021-01-07 for example with multiple lots/doses (multiple rows in VAX file) for testing
        903999 903157 903848 903867 904522 905340 906428 907571 907776 907837 908431 908448 908763 909370 909520 910580 919620
            18 doses: https://medalerts.org/vaersdb/findfield.php?IDNUMBER=1900339&WAYBACKHISTORY=ON                '''



# Integrated from original: compare()
def compare(date_currently):
    '''
        Compare based on flattened (one row per VAERS_ID with each row containing all of the information)
           saving modifications to changes column.

        Add reports to result set when not already there.

        VAX file columns:
            VAERS_ID    VAX_TYPE    VAX_MANU    VAX_LOT    VAX_DOSE_SERIES    VAX_ROUTE    VAX_SITE    VAX_NAME
                There are multiple entries with same VAERS_ID, different VAX_LOT and VAX_DOSE_SERIES    '''
    global df_flat_1

    if not covid_earliest_vaers_id:
        return

    files_populate_information()

    file_flattened_previous = get_flattened('previous', date_currently)   # like ... 2022-10-28_VAERS_FLATTENED.csv
    file_flattened_working  = get_flattened('working' , date_currently)   # like ... 2022-11-04_VAERS_FLATTENED.csv

    if not file_flattened_previous:
        print() ; print(f'        No flattened file to compare prior to this, {date_currently}, skip compare')
        file_for_copy_original = dir_compared    + '/' + date_currently + '_VAERS_FLATFILE.csv'
        print(f'        Due to first drop, copying current {file_flattened_working} to: {file_for_copy_original}') ; print()

        shutil.copy(file_flattened_working, file_for_copy_original)

        # Give it these columns for consistency
        df_flat_init = open_file_to_df(file_for_copy_original, doprint=0)
        df_flat_init['cell_edits'] = 0
        df_flat_init['status'    ] = ''
        df_flat_init['changes'   ] = ''
        df_flat_init = set_columns_order(df_flat_init)
        write_to_csv(df_flat_init, file_for_copy_original, open_file=0)

        # Initial row in stats file when only one compare/changes file
        do_never_ever( df_flat_init.VAERS_ID.to_list(), date_currently, 'compare on df_flat_init')
        do_ever_covid( sorted(set(df_flat_init.VAERS_ID.to_list())) )
        stats_resolve(date_currently)

        return

    previous_date = date_from_filename(file_flattened_previous)
    if not previous_date:   # first run?
        exit('No previous_date, cannot continue')

    '''    Load previous and current flattened files — read from disk in parallel if needed    '''
    _prv_in_mem = len(df_flat_1) > 0
    _new_in_mem = len(df_flat_2) > 0

    if _prv_in_mem:
        print(f"        Using flat {dict_done_flag['flat1']} already in memory, {len(df_flat_1)} rows")
        df_flat_prv = df_flat_1.copy()
    if _new_in_mem:
        print(f"        Using flat {dict_done_flag['flat2']} already in memory, {len(df_flat_2)} rows") ; print()
        df_flat_new = df_flat_2.copy()
        df_flat_new = types_set(df_flat_new)

    if not _prv_in_mem and not _new_in_mem:
        print(f'        Reading both flat files in parallel ({NUM_CORES} cores)')
        with ThreadPoolExecutor(max_workers=2) as ex:
            fut_prv = ex.submit(open_file_to_df, file_flattened_previous, 1)
            fut_new = ex.submit(open_file_to_df, file_flattened_working,  1)
            df_flat_prv = fut_prv.result()
            df_flat_new = fut_new.result()
        check_dupe_vaers_id(df_flat_prv)
        check_dupe_vaers_id(df_flat_new)
    elif not _prv_in_mem:
        df_flat_prv = open_file_to_df(file_flattened_previous, doprint=1)
        check_dupe_vaers_id(df_flat_prv)
    elif not _new_in_mem:
        df_flat_new = open_file_to_df(file_flattened_working, doprint=1)
        check_dupe_vaers_id(df_flat_new)

    if not stats['drop_input_covid']:
        stats['drop_input_covid'] = len(df_flat_new)
        print(f"=== Added count_input {stats['drop_input_covid']} in def compare")

    dict_done_flag['flat1'] = date_currently

    '''    cell_edits and changes fields only exist in the changes file
              Have to be pulled from there to be updated when applicable    '''
    print() ; print(f'    Previous changes file for changes, cell_edits and status columns')

    '''     Previous changes filename, conditions, for compare    '''
    # KEEP FOR NOW if len(df_flat_prv):
    # KEEP FOR NOW     print(f"        Using changes {dict_done_flag['changes']} already in memory")
    # KEEP FOR NOW elif len( files['changes']['date'] ):
    # KEEP FOR NOW     dates_changes             = files['changes']['date']
    # KEEP FOR NOW     date_prv_candidates       = [x for x in dates_changes if x < date_currently]
    # KEEP FOR NOW     date_changes_prv          = date_prv_candidates[-1]  # most recent
    # KEEP FOR NOW     filename_changes_previous = files['changes']['keyval'][date_changes_prv]  # like TODO
    # KEEP FOR NOW     df_flat_prv = open_file_to_df(filename_changes_previous, doprint=1)
    # KEEP FOR NOW     del dates_changes, date_prv_candidates, date_changes_prv, filename_changes_previous
    # KEEP FOR NOW
    # KEEP FOR NOW else:
    # KEEP FOR NOW     '''   Resort to initializing with flattened.      '''
    # KEEP FOR NOW     print(f'    Due to first CHANGES, using {file_flattened_previous}')

    # If first run, it needs these columns since only changes files contain these columns
    if 'cell_edits' not in df_flat_prv.columns: df_flat_prv['cell_edits'] = int(0)
    if 'changes'    not in df_flat_prv.columns: df_flat_prv['changes'   ] = ''
    if 'status'     not in df_flat_prv.columns: df_flat_prv['status'    ] = ''
    df_flat_prv  = df_flat_prv.sort_values(by=['cell_edits', 'status', 'changes'], ascending=False)

    if nan_alert(df_flat_prv):
        pause=1                         # nans in  ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']

    del file_flattened_previous, file_flattened_working

    #df_flat_prv_ori = df_flat_prv.copy() ; df_flat_new_ori = df_flat_new.copy()

    print()
    print(f'{" "*30}        Comparing')
    print() ; print(f'{" "*30} {previous_date} v. {date_currently}')

    '''
        1. Identical in both prv and new
        2. Excess in prv
            a. Deleted in new
            b.
        3. Excess in new
            a. Brand new above highest prv ID
            b. Restored
            c.
        4. Both prv and new remaining
            a. Changed
    '''

    # tmp for testing, normally done in flat but using flat files cooked up manually
    if use_test_cases:
        df_flat_new    = symptoms_dedupe_repeat_sentences(df_flat_new)
        df_flat_prv    = symptoms_dedupe_repeat_sentences(df_flat_prv)

    print()
    print(f'{  len(df_flat_new):>10} this drop total covid')
    print(f'{  len(df_flat_prv):>10}  previous total covid')

    '''    Initialize df_edits, starting with df_flat_new,
                then they will be changed in some cases later, eventually going to df_changes_done    '''
    # for ensuring all present on return
    df_both_flat_inputs = pd.concat([df_flat_prv.reset_index(drop=True), df_flat_new.reset_index(drop=True) ], ignore_index=True).drop_duplicates(subset='VAERS_ID')

    df_edits        = df_flat_new.copy()
    df_changes_done = pd.DataFrame(columns=list(df_flat_prv.columns))
    list_vids_changes_in_edits = df_flat_prv.loc[ df_flat_prv.VAERS_ID.isin(df_edits      .VAERS_ID) ].VAERS_ID.to_list()
    list_vids_edits_in_prv = df_edits.loc      [ df_edits      .VAERS_ID.isin(df_flat_prv.VAERS_ID) ].VAERS_ID.to_list()
    list_vids_in_both = list(  set(list_vids_changes_in_edits) & set(list_vids_edits_in_prv)  )
    print(f'{len(list_vids_in_both):>10} in both previous and new, copying their values in edits, status, changes')

    # Bring in the previous changes etc
    list_vids_both_only_to_copy = sorted( set(df_flat_prv.VAERS_ID.to_list()) & set(df_edits.VAERS_ID.to_list()) )

    list_vids_w_changes_in_both = df_flat_prv.loc[df_flat_prv.cell_edits.ne(0) | df_flat_prv.changes.ne('') | df_flat_prv.status.ne('')].VAERS_ID.to_list()

    # Only those with meaningful (changed values) to copy
    list_vids_filtered_both_only_to_copy = sorted( set(list_vids_both_only_to_copy) & set(list_vids_w_changes_in_both) )

    if len(list_vids_w_changes_in_both):
        # Replace 3 sequential copy_column calls (each: isin filter + copy + concat × 2)
        # with a single vectorized map over the same filter — ~6× fewer large operations.
        _vids_set   = set(list_vids_filtered_both_only_to_copy)
        _edit_mask  = df_edits.VAERS_ID.isin(_vids_set)
        _prv_lookup = df_flat_prv.loc[df_flat_prv.VAERS_ID.isin(_vids_set)].set_index('VAERS_ID')
        _edit_vids  = df_edits.loc[_edit_mask, 'VAERS_ID']
        for _col in ('cell_edits', 'changes', 'status'):
            df_edits.loc[_edit_mask, _col] = _edit_vids.map(_prv_lookup[_col]).values
        df_edits = df_edits.sort_values(['cell_edits', 'status', 'changes'], ascending=False)

    # Ensure types are set correctly regardless of whether there are changes
    df_edits = types_set(df_edits)
    warn_mixed_types(df_edits)

    if nan_alert(df_edits):
        pause=1   # for breakpoint in debugger

    # Add any others from df_flat_prv not in df_edits
    df_flat_prv_not_in_edits = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(df_edits.VAERS_ID) ]  # PICKS UP DELETED ALSO
    print(f'            Concat of {len(df_flat_prv_not_in_edits)} previous that were not in edits')
    if len(df_flat_prv_not_in_edits):
        df_edits = pd.concat([ df_edits.reset_index(drop=True), df_flat_prv_not_in_edits.reset_index(drop=True) ], ignore_index=True)
        # After concat, types may get mixed up, so reset them
        df_edits = types_set(df_edits)

    '''    Identical. prv and new
                Set cell_edits 0 and nothing in changes column or status.
                Move to df_changes_done (removing from df_flat_prv and df_flat_new)     '''
    print(f'            Moving identical rows to done ({NUM_CORES} cores)')
    # astype(str) for object arrays is GIL-bound — threads give no real parallelism.
    # Use a forked Pool: workers inherit both DataFrames via CoW, each converts a
    # column-subset, returning small partial DataFrames that we concat back.
    global _g_df_pair, _g_pb_cmp, _g_nb_cmp
    _g_df_pair = [df_flat_prv, df_flat_new]
    _prv_col_chunks = [list(c) for c in np.array_split(df_flat_prv.columns.tolist(),
                                                        min(NUM_CORES, len(df_flat_prv.columns)))]
    _new_col_chunks = [list(c) for c in np.array_split(df_flat_new.columns.tolist(),
                                                        min(NUM_CORES, len(df_flat_new.columns)))]
    _astype_tasks = [(0, c) for c in _prv_col_chunks] + [(1, c) for c in _new_col_chunks]
    with Pool(min(NUM_CORES, len(_astype_tasks))) as _apool:
        _aresults = _apool.map(_astype_str_fillna_pair_chunk, _astype_tasks)
    _g_df_pair = []
    _prv_s = pd.concat(_aresults[:len(_prv_col_chunks)], axis=1)
    _new_s = pd.concat(_aresults[len(_prv_col_chunks):], axis=1)

    _in_both = set(_prv_s['VAERS_ID'].tolist()) & set(_new_s['VAERS_ID'].tolist())
    if _in_both:
        _pb   = _prv_s[_prv_s['VAERS_ID'].isin(_in_both)].sort_values('VAERS_ID').reset_index(drop=True)
        _nb   = _new_s[_new_s['VAERS_ID'].isin(_in_both)].sort_values('VAERS_ID').reset_index(drop=True)
        _cols = [c for c in _pb.columns if c in _nb.columns]
        del _prv_s, _new_s
        # Element-wise string comparison is also GIL-bound — split columns across workers.
        _g_pb_cmp, _g_nb_cmp = _pb, _nb
        _cmp_chunks = [list(c) for c in np.array_split(_cols, min(NUM_CORES, len(_cols)))]
        with Pool(len(_cmp_chunks)) as _cpool:
            _same_arrays = _cpool.map(_compare_rows_col_chunk, _cmp_chunks)
        _g_pb_cmp = _g_nb_cmp = None
        _same_np = np.logical_and.reduce(_same_arrays) if len(_same_arrays) > 1 else _same_arrays[0]
        vids_identical = _pb.loc[pd.Series(_same_np, index=_pb.index), 'VAERS_ID'].astype(int).tolist()
        del _pb, _nb
    else:
        vids_identical = []
        del _prv_s, _new_s

    # Move identicals to done, also remove these IDs in prv and new
    if vids_identical:
        df_edits, df_changes_done = move_rows(df_edits.loc[ df_edits.VAERS_ID.isin(vids_identical) ], df_edits, df_changes_done)
        print(f'            Removing identicals done from previous and new')
        df_flat_prv    = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(vids_identical) ]
        df_flat_new    = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(vids_identical) ]
    print(f'{len(vids_identical):>10} identical set aside into df_changes_done (first content)')
    ######del df_merged, df_identical, vids_identical

    print(f'{len(df_flat_new):>10} this drop remaining to work with')
    print(f'{len(df_flat_prv):>10}  previous remaining to work with')
    print(f'{len(df_flat_new) - len(df_flat_prv):>10} difference')

    # New stuff can be one of three things: higher than the highest prv VAERS_ID or gapfill or the restore of a deleted report
    list_new_not_in_prv = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ].VAERS_ID.to_list()


    '''    Deleted. Flat prv-only

           897309 NOT DELETED   897309  42  Deleted 2020-12-25          0   U           USPFIZER INC2020443518                                          2020-11-17  2020-11-18      probably polycellulitis; The whole half of the arm was really red and swollen; The whole half of the arm was really red and swollen; This is a spontaneous report from a contactable pharmacist. A patient of unspecified age and gender received pneumococcal 13-val conj vac (dipht crm197 protein) (PREVNAR 13), intramuscular on an unspecified date at single dose for immunization. The patient's medical history and concomitant medications were not reported. The patient experienced polycellulitis, the whole half of the arm was really red and swollen on an unspecified date. The reporter mentioned that a patient developed something that the reporter thought was probably polycellulitis after shot of unknown pneumonia vaccine product; product might have been Prevnar 13 but reporter was not sure. The patient called the doctor who sent them to a clinic to be seen that day and think theirs was maybe 3 days since they had gotten the unknown pneumonia vaccine if the reporter remembered correctly. This occurred maybe 6 months ago. The reporter just remembered the whole half of the arm was really red and swollen. The outcome of the events was unknown. Lot/batch number has been requested.; Sender's Comments: A contributory role of PREVNAR 13 cannot be fully excluded in triggering the onset of probably polycellulitis with the whole half of the arm was really red and swollen.  The impact of this report on the benefit/risk profile of the Pfizer product is evaluated as part of Pfizer procedures for safety evaluation, including the review and analysis of aggregate data for adverse events. Any safety concern identified as part of this review, as well as any appropriate action in response, will be promptly notified to Regulatory Authorities, Ethics Committees and Investigators, as appropriate.   PNC13   Pfizer-Wyeth        UNK OT      PNEUMO PREVNAR13    0   0   0       U   0       UNK                     2       _|_Cellulitis_|_Erythema_|_Peripheral swelling_|_

                They all exist in df_edits but not in df_flat_new.
                Add cell_edits +42 and in status column add Deleted.
                Move to df_changes_done, removing from df_edits and df_flat_prv, also from df_flat_new as no compare possible of course.     '''
    list_deleted_prior = []
    list_prv_not_in_new = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(df_flat_new.VAERS_ID) ].VAERS_ID.to_list()
    if len(list_prv_not_in_new):  # 12-25 deleted for real, 3 of them: [902986, 903003, 903130]

        # Deleted already noted earlier, hangers-on
        df_deleted_already_noted = df_edits.loc[ df_edits.VAERS_ID.isin(list_prv_not_in_new) &  df_edits.status.str.contains(r'Deleted \d{4}-\d{2}-\d{2}    $', na=False) ] # note end of line, $
        list_deleted_prior       = df_deleted_already_noted.VAERS_ID.to_list()

        # Deleted new, not already noted earlier
        df_deleted_to_notate_new = df_edits.loc[ df_edits.VAERS_ID.isin(list_prv_not_in_new) & ~df_edits.VAERS_ID.isin(df_deleted_already_noted.VAERS_ID) ]
        list_deleted_to_notate_new = df_deleted_to_notate_new.VAERS_ID.to_list()

        if len(list_deleted_to_notate_new):
            df_edits.loc[ df_edits.VAERS_ID.isin(list_deleted_to_notate_new), 'status'     ] += f'Deleted {date_currently}    '
            df_edits.loc[ df_edits.VAERS_ID.isin(list_deleted_to_notate_new), 'cell_edits' ] += len(columns_vaers)
        len_newly_deleted = len(list_deleted_to_notate_new)

        print(f'{      len_newly_deleted:>10} newly deleted kept: {subrange(list_deleted_to_notate_new, 6)}')
        print(f'{len(list_deleted_prior):>10} deleted that were noted already: {subrange(list_deleted_prior, 6)}')

        #df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_deleted_prior)], df_edits, df_changes_done)
        #df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_deleted_to_notate_new)], df_edits, df_changes_done)
        #df_flat_prv    = df_flat_prv   .loc[ ~df_flat_prv   .VAERS_ID.isin(list_prv_not_in_new) ]
        #df_flat_prv    = df_flat_prv   .loc[ ~df_flat_prv   .VAERS_ID.isin(list_deleted_prior) ]  # remove them from flat_prv, and they are already not in new
        #df_flat_prv    = df_flat_prv   .loc[ ~df_flat_prv   .VAERS_ID.isin(list_deleted_to_notate_new) ]
        stats['deleted'] += len_newly_deleted

        # These are in prv and NOT new so no compare is possible, they're done
        df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_prv_not_in_new)], df_edits, df_changes_done)
        df_flat_prv = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(list_prv_not_in_new) ]  # remove from prv


    ######del df_deleted_to_notate_new, list_deleted_to_notate_new

    '''
        1. Identical in both prv and new
        2. Excess in prv
            a. Deleted in new
            b.
        3. Excess in new
            a. Brand new above highest prv ID
            b. Restored
            c. Gapfill
        4. Both prv and new remaining
            a. Changed
    '''

    '''    Restored. Flat new that were Deleted but now being Restored
                They all exist in df_edits and are marked Deleted.
                Add cell_edits +42 and add status column Restored.
                Remove them from df_flat_new but keep them in df_edits since there could be other changes also.        '''
    df_restored = df_edits.loc[ df_edits.VAERS_ID.isin(df_flat_new.VAERS_ID) & df_edits.status.str.contains(r'Deleted \d{4}-\d{2}-\d{2}    $', na=False) ]
    list_restored = df_restored.VAERS_ID.to_list()
    if list_restored:
        # WAIT, ARE THESE IN DF_EDITS OR JUST DF_FLAT_NEW?
        df_edits.loc[ df_edits.VAERS_ID.isin(list_restored), 'cell_edits' ] += len(columns_vaers)
        df_edits.loc[ df_edits.VAERS_ID.isin(list_restored), 'status' ]     += f'Restored {date_currently}    '   # Mark them as Restored
        df_edits, df_changes_done = move_rows(df_edits.loc[ df_edits.VAERS_ID.isin(list_restored) ], df_edits, df_changes_done)  # TODO: If changes also, those are missed here
        ######    WTF  Do rerun  #####df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_restored) ]  # remove them from df_flat_new being scrutinized
        df_flat_prv = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(list_restored) ]  # some confusion, just removing, TODO
        stats['restored'] += len(list_restored)
    ######del df_deleted_already_noted

    '''    Greater-Than.  Flat new-only high VAERS_ID: df_flat_new VAERS_ID above max df_flat_prv
                Set cell_edits 0 and nothing in changes or status column.
                Move to df_changes_done (removing from df_flat_new)     '''
    list_flat_new_gt_vids = df_flat_new.loc[ df_flat_new.VAERS_ID.gt( df_flat_prv.VAERS_ID.max() ) ].VAERS_ID.to_list()
    list_flat_new_Lt_vids = df_flat_new.loc[ df_flat_new.VAERS_ID.lt( df_flat_prv.VAERS_ID.max() ) ].VAERS_ID.to_list()

    '''    Gapfill/Throttled/Delayed/Late. Flat new-only remaining.  (VAERS_IDs that didn't show up in in the sequence their drop but only later)
                First appearance, late. Is possible because all greater-than prv.max() have already been moved.
                Move from df_edits to df_changes_done and remove from df_flat_new         '''
    list_prv_vids = df_flat_prv.VAERS_ID.to_list()
    list_gapfills = sorted( set(list_flat_new_Lt_vids) - set(list_prv_vids) )
    list_gapfills = sorted( set(list_gapfills)         - set(list_restored) )  # cludgy but seems to get the job done
    if len(list_gapfills):
        df_edits = df_edits.copy().sort_values(by=['status', 'changes', 'cell_edits'], ascending=False)
        df_edits.loc[ df_edits.VAERS_ID.isin(list_gapfills), 'status' ] += f'Delayed {date_currently}    '  # staying with the term Delayed for now
        stats['gapfill'] += len(list_gapfills)
        # These are in new and NOT prv so no compare is possible, they're done
        df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_gapfills)], df_edits, df_changes_done)
        df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_gapfills) ]  # remove from new

    check_dupe_vaers_id(df_edits)
    check_dupe_vaers_id(df_changes_done)

    print(f'{  len(list_flat_new_gt_vids):>10} new (higher than max VAERS_ID in previous) for {date_currently}')
    print(f'{          len(list_gapfills):>10} delayed this drop, filling gaps' )
    print(f'{           len_newly_deleted:>10} newly deleted this drop kept' )
    print(f'{          len(list_restored):>10} restored this drop {subrange(list_restored, 6)}' )
    ######del list_flat_new_gt_vids, list_gapfills, list_prv_not_in_new, list_restored, len_newly_deleted

    len_before = len(df_flat_prv)
    df_flat_prv = df_flat_prv.drop_duplicates(subset=df_flat_prv.columns, keep='last')
    if len_before - len(df_flat_prv):      # Should be none. Overwrite of values now done
        print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_flat_prv)):>10} complete duplicates dropped in df_flat_prv, SHOULD NOT HAPPEN \n\n\n\n')

    len_before = len(df_flat_new)
    df_flat_new = df_flat_new.drop_duplicates(subset=df_flat_new.columns, keep='last')      # Should be none. Overwrite of values now done
    if len_before - len(df_flat_new):
        print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_flat_new)):>10} complete duplicates dropped in df_flat_new, SHOULD NOT HAPPEN \n\n\n\n')

    # Brand new reports only to done, no compare needed
    df_only_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ]
    list_only_flat_new = df_only_flat_new.VAERS_ID.to_list()
    if list_only_flat_new:
        df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_only_flat_new) ]
        df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_only_flat_new)], df_edits, df_changes_done)

    # Ensure prv and new are now the same reports, to be compared
    if prv_new_error_check(df_flat_prv, df_flat_new):
        pause=1  # nans in ['CAGE_YR', 'CAGE_MO', 'HOSPDAYS']
    if nan_alert(df_flat_prv) or nan_alert(df_flat_new):
        pause=1
    '''     Remaining are only common VAERS_IDs and same number of rows containing some fields that vary.    '''

    df_flat_prv = df_flat_prv.sort_values(by='VAERS_ID') ; df_flat_prv = df_flat_prv.reset_index(drop=True)
    df_flat_new = df_flat_new.sort_values(by='VAERS_ID') ; df_flat_new = df_flat_new.reset_index(drop=True)
    df_flat_new = df_flat_new.reindex(df_flat_prv.columns, axis=1)  # Same column order. TODO: Find out how the columns changed order after adding the blanks block of code below

    if len(df_edits):
        if nan_alert(df_edits):
            pause=1

    len_edits = 0           # ongoing count of modified reports
    vid_cols_affected = {}

    warn_mixed_types(df_edits)
    df_flat_prv = types_set(df_flat_prv)
    warn_mixed_types(df_flat_prv)
    warn_mixed_types(df_flat_new)
    warn_mixed_types(df_changes_done)

    list_prv_and_new_both = set( df_flat_prv.VAERS_ID.to_list() ) & set( df_flat_new.VAERS_ID.to_list() )
    if not len(df_flat_new):
        print(f'{str(0):>10} column changes in {len(columns_vaers)} columns')
    else:
        print() ; print('    Column value changes') ; print()      # section could do with some refactoring smarter surely

        _skip = {'cell_edits', 'status', 'changes'}
        try:
            if len(df_flat_prv) != len(df_flat_new):
                raise ValueError(f"Row count mismatch: prv={len(df_flat_prv)} new={len(df_flat_new)}")
            # Fast per-column equals() check — avoids building the expensive compare() output DataFrame
            cols_changed_list = sorted([
                col for col in df_flat_prv.columns
                if col not in _skip and not df_flat_prv[col].equals(df_flat_new[col])
            ])
        except Exception as e:            # note error, patch and allow to continue
            error(f'    prv and new len diff {e}    {inspect.stack()[1][2]}')
            print(f'{line()} {e}')
            if   len(df_flat_prv) > len(df_flat_new):   # TODO: Is this insane?????  WHY ARE THEY DIFFERENT IN THE FIRST PLACE????
                df_flat_prv = df_flat_prv.loc[ df_flat_prv.VAERS_ID.isin(df_flat_new.VAERS_ID) ]
                df_flat_prv = df_flat_prv.reset_index(drop=True)
                df_flat_new = df_flat_new.reset_index(drop=True)
            elif len(df_flat_prv) < len(df_flat_new):
                df_flat_new = df_flat_new.loc[ df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ]
                df_flat_prv = df_flat_prv.reset_index(drop=True)
                df_flat_new = df_flat_new.reset_index(drop=True)
            cols_changed_list = sorted([
                col for col in df_flat_prv.columns
                if col not in _skip and not df_flat_prv[col].equals(df_flat_new[col])
            ])

        check_dupe_vaers_id(df_flat_prv)
        check_dupe_vaers_id(df_flat_new)
        count_cols_altered    = len(cols_changed_list)
        show_col_row_progress = 0
        count_cols            = 0
        '''    Debug utility
                    Specify a VAERS_ID in vid_target
                       and/or a debug_cols to focus on.        '''

        # Build per-column args in parallel using a fork-based Pool.
        # Forked workers inherit the full-column numpy arrays from module globals without
        # pickling them over IPC — each worker receives only a tiny column-name string.
        global _g_prv_arrs, _g_new_arrs, _g_ids_np_global, _g_col_date, _g_col_test
        _g_prv_arrs      = {col: df_flat_prv[col].to_numpy() for col in cols_changed_list}
        _g_new_arrs      = {col: df_flat_new[col].to_numpy() for col in cols_changed_list}
        _g_ids_np_global = df_flat_prv['VAERS_ID'].to_numpy()
        _g_col_date      = date_currently
        _g_col_test      = use_test_cases
        _build_w = min(NUM_CORES, len(cols_changed_list)) if cols_changed_list else 1
        print(f'    Building {len(cols_changed_list)} column filter masks ({_build_w} cores)...')
        with Pool(_build_w) as _build_pool:
            col_args = list(_build_pool.map(_build_col_arg_global, cols_changed_list))
        _g_prv_arrs.clear(); _g_new_arrs.clear()
        _g_ids_np_global = _g_col_date = _g_col_test = None

        _total_chg_rows = sum(len(a[1]) for a in col_args)
        # Sub-chunk large columns so one slow column (e.g. SYMPTOM_TEXT with 50k changes)
        # doesn't starve all other cores.  Target ~NUM_CORES*4 tasks total so every
        # worker stays busy and load balances naturally with chunksize=1.
        _row_task_cap = max(2000, _total_chg_rows // max(1, NUM_CORES * 4))
        _flat_tasks   = []   # list of col_arg tuples (possibly sub-chunked)
        _task_labels  = []   # column name for each task (for reassembly)
        for _ca in col_args:
            _col, _ids, _prv, _new, _dt, _tc = _ca
            _n = len(_ids)
            if _n <= _row_task_cap:
                _flat_tasks.append(_ca)
                _task_labels.append(_col)
            else:
                _n_sub = min(NUM_CORES, max(2, _n // _row_task_cap))
                for _idx in np.array_split(np.arange(_n), _n_sub):
                    _flat_tasks.append((_col, _ids[_idx], _prv[_idx], _new[_idx], _dt, _tc))
                    _task_labels.append(_col)

        _n_tasks = len(_flat_tasks)
        print(f'    Processing {len(col_args)} changed columns → {_n_tasks} tasks ({_total_chg_rows:,} changed-row pairs) across {NUM_CORES} cores...')
        with Pool(min(NUM_CORES, _n_tasks)) as pool:
            if SHOW_PROGRESS:
                _raw_results = list(tqdm(
                    pool.imap(_compare_col_worker, _flat_tasks, chunksize=1),
                    total=_n_tasks, desc="    Comparing columns", unit="task", leave=True
                ))
            else:
                _raw_results = list(pool.imap(_compare_col_worker, _flat_tasks, chunksize=1))

        # Merge sub-chunk results back into one result per column
        from collections import defaultdict as _dd
        _by_col = _dd(list)
        for _lbl, _res in zip(_task_labels, _raw_results):
            _by_col[_lbl].append(_res)
        all_col_results = []
        for _col in dict.fromkeys(_task_labels):   # preserve original column order
            _chunks = _by_col[_col]
            if len(_chunks) == 1:
                all_col_results.append(_chunks[0])
            else:
                _m_rup = []; _m_bb = None
                _m_st  = {'cells_edited': 0, 'cells_emptied': 0,
                          'trivial_changes_ignored': 0, 'col_count': 0}
                _m_pl  = []
                for _, _rup, _bb, _st, _pl in _chunks:
                    _m_rup.extend(_rup)
                    if _bb:
                        if _m_bb is None:
                            _m_bb = {**_bb, 'vids': list(_bb['vids']),
                                     'prv_vals': dict(_bb['prv_vals'])}
                        else:
                            _m_bb['vids'].extend(_bb['vids'])
                            _m_bb['prv_vals'].update(_bb['prv_vals'])
                            _m_bb['cells_emptied'] += _bb['cells_emptied']
                    for _k in _m_st:
                        _m_st[_k] += _st[_k]
                    _m_pl.extend(_pl)
                all_col_results.append((_col, _m_rup, _m_bb, _m_st, _m_pl))

        # Apply all results sequentially to df_edits
        df_edits = df_edits.reset_index(drop=True)
        vid_index = pd.Series(df_edits.index, index=df_edits['VAERS_ID'].values)

        for col, row_updates, bulk_blank, col_stats, print_lines in all_col_results:
            # Flush buffered print output from worker
            for line_str in print_lines:
                print(line_str)

            # Accumulate stats
            stats['cells_edited']            += col_stats['cells_edited']
            stats['cells_emptied']            += col_stats['cells_emptied']
            stats['trivial_changes_ignored']  += col_stats['trivial_changes_ignored']
            if col_stats['col_count']:
                stats['columns'][col] = stats['columns'].get(col, 0) + col_stats['col_count']

            # Apply bulk-blank update
            if bulk_blank:
                vids_list  = bulk_blank['vids']
                prv_vals   = bulk_blank['prv_vals']
                bb_col     = bulk_blank['col']
                bb_date    = bulk_blank['date']
                stats['cells_emptied'] += bulk_blank['cells_emptied']

                mask = df_edits['VAERS_ID'].isin(vids_list)
                df_edits.loc[mask, 'cell_edits'] = df_edits.loc[mask, 'cell_edits'].astype(int) + 1
                changes_delta = df_edits.loc[mask, 'VAERS_ID'].map(
                    {v: f"{bb_col} {bb_date}: {prv_vals.get(v, '')} <> []    " for v in vids_list}
                ).fillna('')
                df_edits.loc[mask, 'changes'] = df_edits.loc[mask, 'changes'].astype(str) + changes_delta

            # Apply per-row updates (vectorized: collect then bulk-apply)
            if row_updates:
                edit_idxs  = []
                change_map = {}   # idx -> accumulated note string
                val_map    = {}   # idx -> value to keep
                for vid, edits_add, changes_note, val_to_keep in row_updates:
                    if vid not in vid_index.index:
                        continue
                    idx = vid_index[vid]
                    if edits_add:
                        edit_idxs.append(idx)
                    if changes_note:
                        change_map[idx] = change_map.get(idx, '') + changes_note
                    if val_to_keep is not None:
                        val_map[idx] = val_to_keep
                if edit_idxs:
                    df_edits.loc[edit_idxs, 'cell_edits'] = (
                        df_edits.loc[edit_idxs, 'cell_edits'].astype(int) + 1
                    )
                if change_map:
                    c_idxs = list(change_map.keys())
                    df_edits.loc[c_idxs, 'changes'] = (
                        df_edits.loc[c_idxs, 'changes'].astype(str).values
                        + [change_map[i] for i in c_idxs]
                    )
                if val_map:
                    v_idxs = list(val_map.keys())
                    df_edits.loc[v_idxs, col] = [val_map[i] for i in v_idxs]

        print()
        print(f"{count_cols_altered:>10} {single_plural(count_cols_altered, 'columns')} altered")
        del count_cols_altered

    '''
    Around here ... 
        FutureWarning: Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. 
            Call result.infer_objects(copy=False) instead. 
            To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
    '''
    len_before_edits = len(df_edits)
    df_edits = df_edits.drop_duplicates(subset='VAERS_ID', keep='last')
    if len_before_edits - len(df_edits):
        print(f'{(len_before_edits - len(df_edits)):>10} duplicates removed from df_edits before final move')
    len_edits += len(df_edits)
    print(f"{len_edits:>10} modified {single_plural(len_edits, 'reports')} on {date_currently}") ; print()

    if nan_alert(df_changes_done):  # should not be necessary, TODO
        pause=1

    #df_changes_done['cell_edits'] = df_changes_done['cell_edits'].astype('float64').astype(int)

    '''    Add everything, with changes now set, into df_changes_done       '''
    df_edits, df_changes_done = move_rows(df_edits, df_edits, df_changes_done)  # moving entire df_edits to df_changes_done
    check_dupe_vaers_id(df_edits)

    '''    VAERS_IDs are int here    '''
    len_before = len(df_changes_done)   # All columns
    df_changes_done = df_changes_done.drop_duplicates(subset=df_changes_done.columns, keep='last')     # Should be none. Overwrite of values now done
    if len_before - len(df_changes_done):
        print(f'{(len_before - len(df_changes_done)):>10} complete duplicates dropped in df_changes_done')
    len_before      = len(df_changes_done)   # Just VAERS_ID, error-checking to make sure
    df_changes_done = df_changes_done.drop_duplicates(subset='VAERS_ID', keep='last')     # Should be none.
    if len_before - len(df_changes_done):
        print(f'\n\n\n\n\t\t\t\t{(len_before - len(df_changes_done)):>10} duplicate VAERS_ID dropped in df_changes_done, should never happen\n\n\n\n')

    '''        Sort the output by cell_edits. Section needs scrutiny.
    '''
    df_changes_done = df_changes_done.fillna('')
    ''' Annoying future warning and I don't know how best to resolve it. FutureWarning: Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
        https://stackoverflow.com/questions/77900971/pandas-futurewarning-downcasting-object-dtype-arrays-on-fillna-ffill-bfill
    '''
    check_dupe_vaers_id(df_changes_done)

    df_changes_done = df_changes_done.sort_values(by=['cell_edits', 'status', 'changes'], ascending=False)

    if nan_alert(df_changes_done):
        pause=1

    '''   VAERS_IDs with the most records/doses/lots in each report and at least one covid19.
              (There's surely a better way with lambda or .apply() and/or .groupby() etc).
           Here, just counting delimiters in VAX_LOT and adding 1 to that.
           A report in df_vax has 18 entries in 2020-12-25 but no covid, for example, see df_vax.VAERS_ID.value_counts().unique()

        vid_top_doses_list = sorted(df_doses.loc[ df_doses.doses.eq(df_doses.doses.max()) ].VAERS_ID.to_list())
            TypeError: '<' not supported between instances of 'str' and 'int'
                Tried df_changes_done.fillna('') above, didn't work.
                Trying VAX_LOT.astype(str)  <== TODO: Should be done earlier. Some must have been made in automatically when read.
    '''
    df_changes_done['VAX_LOT'] = df_changes_done.VAX_LOT.astype(str)
    df_doses = df_changes_done.loc[df_changes_done.VAX_LOT.astype(str).str.contains(r'\|', na=False)][['VAERS_ID', 'VAX_LOT']]
    if nan_alert(df_doses):
        pause=1
    df_doses['doses'] = df_doses.VAX_LOT.astype(str).str.count(r'\|')

    filename = dir_compared + '/' + date_currently + '_VAERS_FLATFILE.csv'
    print(f'    Writing ... {filename}') ; print()

    # When running against multiple input dates, open just the last one
    do_open = 0

    if files['input']['date'][-1] in filename:      # Last one
        do_open = 1

    df_changes_done = set_columns_order(df_changes_done)

    print()
    print(f'    This drop {do_elapsed(elapsed_drop)}')
    print(f'      Overall {do_elapsed(elapsed_begin)}')
    print()

    write_to_csv(df_changes_done, filename, open_file=do_open)   # Has to be written here, the one big file. Separation of A and B later when necessary for the Excel rows limit.

    # Thru this transition for clarity, df_changes_done becomes the next df_flat_prv (when processing multiple drops)
    df_flat_1 = df_changes_done.copy()   # use as the previous in next when processing more than one drop.

    dict_done_flag[ 'changes' ] = date_currently       # bookkeeping

    #lookup(previous_date, date_currently, vid_cols_affected)       # for debug, open a csv showing all of the reports touched for these dates compared

    vid_top_doses_list = sorted(df_doses.loc[ df_doses.doses.eq(df_doses.doses.max()) ].VAERS_ID.to_list())
    top_num = df_doses.doses.max() + 1
    print(f'{len(vid_top_doses_list):>10} {single_plural(len(vid_top_doses_list), "reports")} with the most ({top_num}) records/lots/doses: {" ".join([str(x) for x in vid_top_doses_list])}')

    #print(f"{stats['comparisons']:>10} {single_plural(stats['comparisons'], 'comparisons')} done")

    vids_present = df_both_flat_inputs.VAERS_ID.to_list()
    do_never_ever( vids_present, date_currently, 'compare on df_both_flat_inputs' )
    do_ever_covid( vids_present )
    stats_resolve( date_currently )

    '''   Ensure nothing is lost      '''
    verify_all_reports_present(df_both_flat_inputs, df_changes_done)  # Relies on collection of IDs, new, never, ever seen

    return



# Integrated from original: open_files()
def open_files(_date):   # like './vaers_drop_inputs/2020-12-25'
    '''    Input files in dir_input:
              csv within directories
              zip files in a single directory, containing csv, treated as if they are folders, sort of.    '''
    files_populate_information()

    if _date in files['consolidated']['date']:      # already consolidated
        print(f'    {_date} already consolidated, no need to copy input files to dir_working')
        shutil.rmtree(dir_working)  # removing directory
        os.mkdir(dir_working)
        set_files_date_marker(dir_working, _date)       # a flag used by consolidate()
        return True  # Already consolidated, so success

    if _date in files['flattened']['date']:
        print(f'    Skipping unzip because flattened for {_date} already exists')
        return True  # Already processed, so success

    if not _date in files['input']['keyval']:
        exit(f"     Failed to find in files['input']['keyval'] the _date {_date} in open_files()  ")
    files_value = files['input']['keyval'][_date]

    if 'csv' in files_value:
        print(f'    Copy all {_date} to {dir_working}')
        to_copy = [x for x in files['input']['files'] if _date in x]
        shutil.rmtree(dir_working)
        os.mkdir(dir_working)
        for x in to_copy:
            shutil.copy(x, dir_working)
    elif isinstance(files_value, list):               # another case earlier in development, csv files already extracted manually
        print(f'    Copy {_date}/* to {dir_working}')
        shutil.rmtree(dir_working)                    # removing directory to avoid error next line
        shutil.copytree(_date, dir_working)
        set_files_date_marker(dir_working, _date)

    elif 'zip' in files_value:                        # zip file, treat it sort of like a directory here
        shutil.rmtree(dir_working)
        os.makedirs(dir_working)
        success = files_from_zip(files_value, dir_working)
        if not success:
            print(f'    Skipping date {_date} due to zip file error')
            return False  # Indicate this date should be skipped
    else:
        exit(f'    Unexpected _date {_date} in open_files() ')

    set_files_date_marker(dir_working, _date)         # a visual aid showing input file date marker, not really a file so much

    return True  # Indicate success



# Integrated from original: verify_all_reports_present()
def verify_all_reports_present(df_compare_input, df_compare_output):     # Ensure nothing is lost
    list_in_input_not_in_output = df_compare_input.loc[ ~df_compare_input.VAERS_ID.isin(df_compare_output.VAERS_ID) ].VAERS_ID.to_list()
    if list_in_input_not_in_output:
        print(f'\n\n\nERROR:\n{len(list_in_input_not_in_output)} VAERS_IDs in df_compare_input NOT in df_compare_output \n {subrange(list_in_input_not_in_output, 6)} \n\n')

    set_vids_in_output           = set( df_compare_output.VAERS_ID.to_list() )
    set_ever_covid               = set( ever_covid.keys()  )
    vids_in_df_not_in_ever_covid = set_vids_in_output - set_ever_covid
    vids_not_in_df_in_ever_covid = set_ever_covid     - set_vids_in_output
    if vids_in_df_not_in_ever_covid:
        print(f'{len(vids_in_df_not_in_ever_covid):>10} vids_in_df_not_in_ever_covid: {subrange(vids_in_df_not_in_ever_covid, 6)}') ; print()
    if vids_not_in_df_in_ever_covid:
        print(f'{len(vids_not_in_df_in_ever_covid):>10} vids_not_in_df_in_ever_covid: {subrange(vids_not_in_df_in_ever_covid, 6)}') ; print()

    # Make certain none lost or extra
    vids_set_of_all_in_prv_and_new = set( df_compare_input.VAERS_ID.to_list() )
    vids_orphans = vids_set_of_all_in_prv_and_new - set(df_compare_output.VAERS_ID.to_list())
    vids_extras  = set(df_compare_output.VAERS_ID.to_list()) - vids_set_of_all_in_prv_and_new
    if vids_orphans:
        df_orphans = df_compare_input.loc[ ~df_compare_input.VAERS_ID.isin(df_compare_output.VAERS_ID) ]
        print(f'\n\n\n{len(df_orphans):>10} orphans, records in df_compare_input that are not in df_compare_output')
        print('    M  U  S  T     F  I  X  \n\n\n')
        write_to_csv(df_orphans, 'orphans.csv', open_file=1)
        df_orphans, df_compare_output = move_rows(df_orphans, df_orphans, df_compare_output)  # moving all of them out to done
    if vids_extras:
        df_extras = df_compare_output.loc[ ~df_compare_output.VAERS_ID.isin(df_compare_input.VAERS_ID) ]
        print(f'\n\n\n{len(df_extras):>10} extras, records in df_compare_output that are not in df_compare_input')
        print(f'    M  U  S  T     F  I  X  {subrange(df_extras.VAERS_ID.to_list(), 6)} \n\n\n')
        write_to_csv(df_extras, 'extras.csv', open_file=1)



# Integrated from original: show_vid_as_text()
def show_vid_as_text(df, vid):
    if not type(vid) is int:   # TODO:  Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)
        print(f'    Expected vid {vid} to be an integer')
    if vid not in df.VAERS_ID.to_list():
        print(f'    vid {vid} not found in the dataframe')
        return
    filename =  f'{vid}.txt'
    df = df.loc[ df.VAERS_ID.eq(vid) ]
    df.to_csv(filename, sep=',', encoding='utf-8-sig', index=False)
    sp.Popen(filename, shell=True)




#  =============================================================================
#  FINAL MERGED FILE CREATION
#  =============================================================================

def create_final_merged_file():
    """
    Create a final merged file from the latest FLATFILE in 2_vaers_full_compared.
    This file contains all records with their complete change history.
    """
    print()
    print("="*80)
    print("CREATING FINAL MERGED OUTPUT FILE")
    print("="*80)
    print()

    # Find the latest FLATFILE
    flatfiles = glob.glob(f'{dir_compared}/*_VAERS_FLATFILE.csv')
    if not flatfiles:
        flatfiles = glob.glob(f'{dir_compared}/*_VAERS_FLATFILE_A.csv')

    if not flatfiles:
        print("    No FLATFILE found in 2_vaers_full_compared directory")
        return

    # Sort by date in filename to get the latest
    flatfiles.sort()
    latest_file = flatfiles[-1]

    print(f"    Using latest file: {os.path.basename(latest_file)}")

    try:
        # Read the latest FLATFILE
        print(f"    Reading {os.path.basename(latest_file)}...")
        df_final = pd.read_csv(latest_file, encoding='utf-8-sig', low_memory=False)

        print(f"    Total records: {len(df_final):,}")

        # Create output filename with timestamp
        output_filename = f'{dir_top}/VAERS_FINAL_MERGED.csv'

        # Write the final merged file
        print(f"    Writing final merged file: {output_filename}")
        write_to_csv(df_final, output_filename, open_file=False)

        print()
        print(f"    ✓ Final merged file created successfully!")
        print(f"    Location: {output_filename}")
        print(f"    Records: {len(df_final):,}")

        # Print summary statistics
        if 'cell_edits' in df_final.columns:
            total_edits = df_final['cell_edits'].sum()
            records_with_edits = len(df_final[df_final['cell_edits'] > 0])
            print(f"    Total cell edits: {total_edits:,}")
            print(f"    Records with edits: {records_with_edits:,}")

        if 'status' in df_final.columns:
            deleted_count = len(df_final[df_final['status'].str.contains('Deleted', na=False)])
            if deleted_count > 0:
                print(f"    Deleted records: {deleted_count:,}")

        print()

    except Exception as e:
        error(f"Failed to create final merged file: {e}")
        print(f"    Error: {e}")
        import traceback
        traceback.print_exc()


#  =============================================================================
#  MAIN EXECUTION
#  =============================================================================

def run_all():
    """Main execution function with all enhancements"""
    print(__file__)
    print()
    print('='*80)
    print('VAERS Enhanced Processing - Complete Edition')
    print('='*80)
    print()
    print('Features:')
    print('  ✓ Multi-core parallel processing')
    print('  ✓ Memory-efficient chunked data handling')
    print('  ✓ Command-line dataset selection (COVID/Full)')
    print('  ✓ Progress bars for all operations')
    print('  ✓ Comprehensive error tracking')
    print('  ✓ Fixed stats functionality')
    print('  ✓ All original processing logic integrated')
    print()

    # If merge-only flag is set, just create the final file and exit
    if args.merge_only:
        print('='*80)
        print("MERGE-ONLY MODE: Creating final merged file")
        print("="*80)
        print()
        create_final_merged_file()
        return

    if not validate_dirs_and_files():
        exit_script("Directory validation failed")

    # Determine total file count and how many are already done (for resume)
    files_populate_information()
    total_input_files = len(files['input']['date'])
    _done_set         = set(files['changes']['date'])
    files_already_done = sum(1 for d in files['input']['date'] if d in _done_set)

    print()
    print("="*80)
    if files_already_done:
        print(f"RESUMING PROCESSING  (skipping {files_already_done} already-completed drops)")
    else:
        print("STARTING PROCESSING")
    print("="*80)
    print()

    # Sticky header: row 1 of the terminal is reserved as a persistent status bar.
    # All other output scrolls in rows 2..N so the status is never pushed off-screen.
    _tty    = _run_tee._orig if _run_tee and hasattr(_run_tee, '_orig') else sys.stdout
    _sticky = _StickyHeader(_tty)

    overall_pbar = None
    if SHOW_PROGRESS and total_input_files > 0:
        overall_pbar = tqdm(
            total=total_input_files,
            initial=files_already_done,
            desc="Overall",
            unit="file",
            file=_tty,          # write directly to terminal, not through Tee
            bar_format='{desc}: {n_fmt}/{total_fmt} [{bar}] {percentage:3.0f}%  '
                       'elapsed {elapsed}  ETA {remaining}  {postfix}',
            position=0,
            dynamic_ncols=True,
        )

    file_num   = files_already_done   # start counter at the correct offset on resume
    file_times = []                   # wall-clock seconds for each completed file
    file_start = None                 # start time of the current file

    def _eta_summary(completed, total, times):
        """Return a short adaptive-ETA string from per-file timings."""
        if not times:
            return ''
        # Use the last min(completed, 5) files for a rolling average (more weight on recent)
        recent = times[-5:]
        avg = sum(recent) / len(recent)
        remaining = total - completed
        eta_sec = avg * remaining
        h = int(eta_sec // 3600)
        m = int((eta_sec % 3600) // 60)
        s = int(eta_sec % 60)
        if h:
            eta_str = f'{h}h{m:02d}m'
        elif m:
            eta_str = f'{m}m{s:02d}s'
        else:
            eta_str = f'{s}s'
        avg_str = f'{avg:.0f}s/file' if avg < 120 else f'{avg/60:.1f}m/file'
        return f'~{eta_str} remaining  ({avg_str} rolling avg, {len(times)} samples)'

    # Main processing loop
    while more_to_do():
        this_drop_date = get_next_date()
        if this_drop_date is None:
            break
        file_num += 1
        file_start = _time.time()

        files_left_now = total_input_files - file_num
        _sticky_text   = (f'File {file_num}/{total_input_files}  '
                          f'({this_drop_date})  —  {files_left_now} left')
        _sticky.update(_sticky_text)
        if overall_pbar:
            overall_pbar.set_postfix_str(f'{this_drop_date}  [{file_num}/{total_input_files}]')

        print_date_banner(this_drop_date, file_num, total_input_files)
        stats_initialize(this_drop_date)

        # Try to open files, skip this date if it fails
        if open_files(this_drop_date) == False:
            print(f'    Skipping processing for {this_drop_date} due to file errors')
            files['done'].append(this_drop_date)  # Mark as done to move to next
            elapsed_file = _time.time() - file_start
            file_times.append(elapsed_file)
            if overall_pbar:
                overall_pbar.update(1)
                overall_pbar.set_postfix_str(_eta_summary(file_num, total_input_files, file_times))
            continue

        consolidate(this_drop_date)
        flatten(this_drop_date)
        compare(this_drop_date)

        elapsed_file = _time.time() - file_start
        file_times.append(elapsed_file)

        eta_line   = _eta_summary(file_num, total_input_files, file_times)
        _INV, _RST = '\033[7m', '\033[0m'
        files_left = total_input_files - file_num
        done_line  = (f'File {_INV}{file_num}/{total_input_files}{_RST} '
                      f'({this_drop_date}) done in {do_elapsed(file_start)}'
                      f'  —  {_INV}{files_left} left{_RST}  {eta_line}')
        print(f'\n    {done_line}')

        # Keep sticky header current with the just-completed file
        _sticky_text = (f'DONE {file_num}/{total_input_files}  '
                        f'({this_drop_date})  —  {files_left} left  {eta_line}')
        _sticky.update(_sticky_text)

        if overall_pbar:
            overall_pbar.update(1)
            overall_pbar.set_postfix_str(eta_line)

    if overall_pbar:
        overall_pbar.close()
    _sticky.close()

    # Create final merged output file
    create_final_merged_file()

    elapsed = do_elapsed(elapsed_begin)
    print()
    print("="*80)
    print(f"PROCESSING COMPLETE - Total time: {elapsed}")
    print("="*80)
    print()

    # Print final error summary
    print_errors_summary()


if __name__ == "__main__":
    try:
        run_all()
    except KeyboardInterrupt:
        print("\n\nProcess interrupted by user")
        exit_script()
    except Exception as e:
        error(f"Unexpected error in main: {type(e).__name__}: {e}")
        import traceback
        traceback.print_exc()
        exit_script()