#!/usr/bin/env python3
'''
Enhanced VAERS Data Processing Script (data-fidelity corrected)
This script enhances the original VAERS processing with:
1. Multi-core parallel processing
2. Memory-efficient chunked data handling
3. Command-line arguments for COVID vs full dataset
4. Progress bars for all major operations
5. Improved error collection and reporting
6. Fixed stats functionality
vaers36.py: corrected version of vaers35.py. Primary fixes:
- set_columns_order now exactly matches orig.py (consistent physical column order + layout with Gary's outputs).
- symptoms_dedupe_repeat_sentences (parallel path) now has safety fallback: if the worker result map produces blank/NaN for a VID that had content, the original pre-dedupe text is preserved. This prevents the data loss in SYMPTOM_TEXT (and potentially other long fields) that was observed in vaers35.py outputs for many heavily-edited / deleted+restored reports (the parallel result assembly + map had no guard, unlike the original apply() path).
Usage:
python vaers36.py --dataset covid --cores 8
python vaers36.py --dataset full --cores 16 --chunk-size 100000
Original by Gary Hawkins - http://univaers.com/download/
Enhanced + data-fidelity fixes 2025/2026
'''
import argparse
import atexit
import glob
import json
import os
import sys
import re
import shutil
import pprint
import subprocess as sp
import inspect
from collections import Counter
from datetime import datetime
from string import punctuation
from pathlib import Path
from multiprocessing import Pool, cpu_count
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import warnings
import numpy as np
import pandas as pd
import time as _time
# Progress bar
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
print("Warning: tqdm not installed. Install with: pip install tqdm")
print("Continuing without progress bars...")
TQDM_AVAILABLE = False
# Fallback tqdm that does nothing
class tqdm:
def __init__(self, iterable=None, *args, **kwargs):
self.iterable = iterable
def __iter__(self):
return iter(self.iterable) if self.iterable else iter([])
def __enter__(self):
return self
def __exit__(self, *args):
pass
def update(self, n=1):
pass
@staticmethod
def write(s):
print(s)
# Zip file handling
try:
import zipfile_deflate64 as zipfile
except ImportError:
print("Warning: zipfile_deflate64 not found. Using standard zipfile.")
print("Install with: pip install zipfile-deflate64 for better compatibility")
import zipfile
pp = pprint.PrettyPrinter(indent=4)
# Suppress specific warnings
warnings.filterwarnings('ignore', category=FutureWarning)
# =============================================================================
# LOG FILE MANAGEMENT
# Each run archives the previous vaers_processing.log to a timestamped name
# (start-time_to_end-time) then opens a fresh log for this run.
# =============================================================================
_LOG_FILE = 'vaers_processing.log'
_META_FILE = 'vaers_processing.meta'
_run_tee = None
class _Tee:
"""Mirror stdout to a log file so all print() output is captured."""
def __init__(self, filepath):
self._orig = sys.stdout
self._log = open(filepath, 'w', encoding='utf-8', buffering=1)
self._in_write = False
sys.stdout = self
def write(self, text):
self._log.write(text)
if self._in_write:
# Inside a tqdm.write() call — write to terminal directly, avoid recursion
self._orig.write(text)
return
self._in_write = True
sys.stdout = self._orig
try:
if TQDM_AVAILABLE and text and getattr(tqdm, '_instances', None):
tqdm.write(text, end='', file=self._orig)
else:
self._orig.write(text)
finally:
sys.stdout = self
self._in_write = False
def flush(self):
self._orig.flush()
self._log.flush()
def isatty(self):
return self._orig.isatty()
def close(self):
if sys.stdout is self:
sys.stdout = self._orig
try:
self._log.close()
except Exception:
pass
class _StickyHeader:
"""Pin a one-line status bar to terminal row 1 using a scroll region.
The terminal scroll region is set to rows 2..N so all normal output
scrolls within that area and row 1 is never overwritten by output.
The status bar is redrawn at row 1 on every update() call.
"""
_INV = '\033[7m' # reverse-video
_RST = '\033[0m'
def __init__(self, tty):
self._tty = tty
self._text = ''
self._active = hasattr(tty, 'isatty') and tty.isatty()
if self._active:
import shutil as _sh
self._cols, self._rows = _sh.get_terminal_size(fallback=(80, 24))
# Restrict scrolling to rows 2..N; row 1 is our status line
tty.write(f'\033[2;{self._rows}r\033[2;1H')
tty.flush()
def update(self, text):
if not self._active:
return
import shutil as _sh
cols, _ = _sh.get_terminal_size(fallback=(80, 24))
# Truncate to fit one terminal row (leave a small margin)
self._text = text[:cols - 2]
self._draw()
def _draw(self):
if self._active:
# ESC 7 = save cursor, move to row 1, clear, write inverted, ESC 8 = restore
self._tty.write(
f'\0337\033[1;1H\033[2K{self._INV} {self._text} {self._RST}\0338'
)
self._tty.flush()
def close(self):
if self._active:
import shutil as _sh
_, rows = _sh.get_terminal_size(fallback=(80, 24))
# Reset to full-screen scroll region and clear the status row
self._tty.write(f'\033[1;{rows}r\0337\033[1;1H\033[2K\0338')
self._tty.flush()
def _compact_ts(ts):
"""Convert 'YYYY-MM-DD HH:MM:SS' to 'YYYYMMDDTHHMMSS'."""
return ts.replace('-', '').replace(' ', 'T').replace(':', '') if ts else ''
def _rotate_log():
"""Rename the previous log file using the start/end timestamps of that run."""
if not os.path.exists(_LOG_FILE):
return None
start_tag = end_tag = ''
if os.path.exists(_META_FILE):
try:
with open(_META_FILE) as f:
meta = json.load(f)
start_tag = _compact_ts(meta.get('start', ''))
end_tag = _compact_ts(meta.get('end', ''))
except Exception:
pass
if not start_tag:
st = os.stat(_LOG_FILE)
start_tag = datetime.fromtimestamp(st.st_ctime).strftime('%Y%m%dT%H%M%S')
end_tag = datetime.fromtimestamp(st.st_mtime).strftime('%Y%m%dT%H%M%S')
archive = f'vaers_processing_{start_tag}_to_{end_tag}.log'
try:
os.rename(_LOG_FILE, archive)
return archive
except Exception as e:
sys.stderr.write(f'Warning: could not archive old log: {e}\n')
return None
def _finish_logging():
"""Record end-time in meta file and close the Tee. Called by atexit."""
global _run_tee
if _run_tee is None:
return
end_ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
with open(_META_FILE) as f:
meta = json.load(f)
meta['end'] = end_ts
with open(_META_FILE, 'w') as f:
json.dump(meta, f)
except Exception:
pass
_run_tee.close()
_run_tee = None
def _start_logging():
global _run_tee
archived = _rotate_log()
_run_tee = _Tee(_LOG_FILE)
if archived:
print(f'Previous log archived as: {archived}')
start_ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
with open(_META_FILE, 'w') as f:
json.dump({'start': start_ts, 'end': ''}, f)
except Exception:
pass
atexit.register(_finish_logging)
_start_logging()
# =============================================================================
# INTERACTIVE MODE
# =============================================================================
def interactive_mode():
"""Prompt for all options when the script is run with no arguments."""
print()
print("=" * 80)
print(" VAERS Data Processor — Interactive Setup")
print(" (Pass --help to see all flags, or supply flags directly to skip this)")
print("=" * 80)
print()
print(" Press Enter at any prompt to accept the shown default.")
print()
r = {}
# --- dataset ---
print(" [1] Dataset")
print(" covid Process COVID-19 era reports from Dec 2020 onward (faster)")
print(" full Process all VAERS reports from 1990 onward (slower/larger)")
val = input(" Choice [covid/full] (default: covid): ").strip().lower()
r['dataset'] = val if val in ('covid', 'full') else 'covid'
if val not in ('covid', 'full', ''):
print(f" Unrecognised value '{val}', using default: covid")
# --- cores ---
default_cores = cpu_count()
print()
print(f" [2] CPU Cores (system has {default_cores} logical cores)")
print(" More cores = faster processing, higher RAM usage.")
val = input(f" Number of cores (default: {default_cores}): ").strip()
if val == '':
r['cores'] = default_cores
else:
try:
r['cores'] = max(1, int(val))
except ValueError:
print(f" Invalid value, using default: {default_cores}")
r['cores'] = default_cores
# --- chunk-size ---
print()
print(" [3] Chunk Size (rows read per pass for large files)")
print(" Lower if you hit memory errors; higher for faster I/O.")
val = input(" Chunk size (default: 50000): ").strip()
if val == '':
r['chunk_size'] = 50000
else:
try:
r['chunk_size'] = max(1000, int(val))
except ValueError:
print(" Invalid value, using default: 50000")
r['chunk_size'] = 50000
# --- date-floor ---
default_floor = '2020-12-13' if r['dataset'] == 'covid' else '1990-01-01'
print()
print(f" [4] Date Floor (earliest report date to include, YYYY-MM-DD)")
val = input(f" Date floor (default: {default_floor}): ").strip()
if val == '':
r['date_floor'] = None
elif re.match(r'^\d{4}-\d{2}-\d{2}$', val):
r['date_floor'] = val
else:
print(f" Invalid date format, using default: {default_floor}")
r['date_floor'] = None
# --- date-ceiling ---
print()
print(" [5] Date Ceiling (latest report date to include, YYYY-MM-DD)")
val = input(" Date ceiling (default: 2025-01-01): ").strip()
if val == '':
r['date_ceiling'] = '2025-01-01'
elif re.match(r'^\d{4}-\d{2}-\d{2}$', val):
r['date_ceiling'] = val
else:
print(" Invalid date format, using default: 2025-01-01")
r['date_ceiling'] = '2025-01-01'
# --- merge-only ---
print()
print(" [6] Merge-Only Mode")
print(" Skip all processing — just create the final merged output file.")
val = input(" Enable? [y/N] (default: N): ").strip().lower()
r['merge_only'] = val in ('y', 'yes')
# --- no-progress ---
print()
print(" [7] Progress Bars (requires tqdm)")
val = input(" Show progress bars? [Y/n] (default: Y): ").strip().lower()
r['no_progress'] = val in ('n', 'no')
# --- test ---
print()
print(" [8] Test Mode (use z_test_cases/ instead of live data)")
val = input(" Enable test mode? [y/N] (default: N): ").strip().lower()
r['test'] = val in ('y', 'yes')
# --- summary + confirm ---
floor_display = r['date_floor'] or default_floor
print()
print("=" * 80)
print(" Configuration Summary")
print("=" * 80)
print(f" Dataset : {r['dataset']}")
print(f" Cores : {r['cores']}")
print(f" Chunk size : {r['chunk_size']:,}")
print(f" Date floor : {floor_display}")
print(f" Date ceiling : {r['date_ceiling']}")
print(f" Merge only : {'Yes' if r['merge_only'] else 'No'}")
print(f" Progress bars : {'No' if r['no_progress'] else 'Yes'}")
print(f" Test mode : {'Yes' if r['test'] else 'No'}")
print()
confirm = input(" Proceed with these settings? [Y/n]: ").strip().lower()
if confirm in ('n', 'no'):
print("\n Aborted. Run again with --help to see all available flags.\n")
sys.exit(0)
print()
return r
# =============================================================================
# COMMAND LINE ARGUMENTS
# =============================================================================
parser = argparse.ArgumentParser(
description='Process VAERS data with memory optimization and parallel processing',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s --dataset covid --cores 8
%(prog)s --dataset full --cores 16 --chunk-size 100000
%(prog)s --dataset covid --date-floor 2021-01-01
'''
)
parser.add_argument('--dataset', choices=['covid', 'full'], default='covid',
help='Process COVID-19 era data only (default) or full historical dataset')
parser.add_argument('--cores', type=int, default=cpu_count(),
help=f'Number of CPU cores to use (default: {cpu_count()})')
parser.add_argument('--chunk-size', type=int, default=50000,
help='Chunk size for processing large datasets (default: 50000)')
parser.add_argument('--date-floor', type=str, default=None,
help='Earliest date to process (default: 2020-12-13 for COVID, 1990-01-01 for full)')
parser.add_argument('--date-ceiling', type=str, default='2025-01-01',
help='Latest date to process (default: 2025-01-01)')
parser.add_argument('--test', action='store_true',
help='Use test cases directory')
parser.add_argument('--no-progress', action='store_true',
help='Disable progress bars')
parser.add_argument('--merge-only', action='store_true',
help='Only create the final merged file, skip all processing')
args = parser.parse_args()
# When run with no arguments, enter interactive setup mode
if len(sys.argv) == 1:
_i = interactive_mode()
args.dataset = _i['dataset']
args.cores = _i['cores']
args.chunk_size = _i['chunk_size']
args.date_floor = _i['date_floor']
args.date_ceiling = _i['date_ceiling']
args.merge_only = _i['merge_only']
args.no_progress = _i['no_progress']
args.test = _i['test']
# =============================================================================
# CONFIGURATION
# =============================================================================
# Dataset-specific configuration
if args.dataset == 'full':
if args.date_floor is None:
date_floor = '1990-01-01'
else:
date_floor = args.date_floor
print(f"\n{'='*80}")
print(f"Processing FULL VAERS dataset from {date_floor}")
print(f"{'='*80}\n")
else:
if args.date_floor is None:
date_floor = '2020-12-13' # Just before first COVID vaccine
else:
date_floor = args.date_floor
print(f"\n{'='*80}")
print(f"Processing COVID-19 ERA dataset from {date_floor}")
print(f"{'='*80}\n")
date_ceiling = args.date_ceiling
NUM_CORES = args.cores
CHUNK_SIZE = args.chunk_size
SHOW_PROGRESS = not args.no_progress and TQDM_AVAILABLE
print(f"Configuration:")
print(f" CPU cores: {NUM_CORES}")
print(f" Chunk size: {CHUNK_SIZE:,} rows")
print(f" Date range: {date_floor} to {date_ceiling}")
print(f" Progress bars: {'Enabled' if SHOW_PROGRESS else 'Disabled'}")
print()
# =============================================================================
# DIRECTORIES AND FILES
# =============================================================================
dir_top = 'z_test_cases' if args.test else '.'
use_test_cases = args.test
dir_input = f'{dir_top}/0_VAERS_Downloads'
dir_working = f'{dir_top}/1_vaers_working'
dir_consolidated = f'{dir_top}/1_vaers_consolidated'
dir_compared = f'{dir_top}/2_vaers_full_compared'
dir_flattened = f'{dir_top}/3_vaers_flattened'
if use_test_cases:
dir_input = f'{dir_top}/drops'
file_stats = f'{dir_top}/stats.csv'
file_never_published = f'{dir_top}/never_published_any.txt'
file_ever_any = f'{dir_top}/ever_published_any.txt'
file_ever_covid = f'{dir_top}/ever_published_covid.txt'
file_writeups_deduped = f'{dir_top}/writeups_deduped.txt'
# =============================================================================
# GLOBAL VARIABLES
# =============================================================================
files_limit = []
vids_limit = []
autodownload = 0
tones = 0
floor_notice_printed = 0
ceiling_notice_printed = 0
covid_earliest_vaers_id = 0
elapsed_begin = _time.time()
elapsed_drop = _time.time()
# Global dataframes
df_vax = pd.DataFrame()
df_data = pd.DataFrame()
df_syms_flat = pd.DataFrame()
df_flat_1 = pd.DataFrame()
df_flat_2 = pd.DataFrame()
df_flat_prv = pd.DataFrame()
df_stats = pd.DataFrame()
# Collections
errors = [] # IMPLEMENTED: Error collection from TODO line 65
punctuations = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'
never_published_any = {}
ever_covid = {}
ever_any = {}
writeups_deduped = {}
files = {}
dict_done_flag = {}
stats = {}
columns_vaers = [
'VAERS_ID', 'AGE_YRS', 'SEX', 'STATE', 'SPLTTYPE',
'DIED', 'L_THREAT', 'ER_VISIT', 'ER_ED_VISIT', 'HOSPITAL', 'DISABLE', 'BIRTH_DEFECT', 'OFC_VISIT',
'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'VAX_ROUTE', 'VAX_SITE', 'VAX_NAME',
'DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE',
'NUMDAYS', 'HOSPDAYS', 'X_STAY', 'RECOVD',
'CAGE_YR', 'CAGE_MO', 'V_ADMINBY', 'V_FUNDBY', 'FORM_VERS', 'PRIOR_VAX',
'CUR_ILL', 'OTHER_MEDS', 'ALLERGIES', 'HISTORY', 'LAB_DATA', 'SYMPTOM_TEXT'
]
# =============================================================================
# ERROR HANDLING - IMPROVED
# =============================================================================
def error(the_error):
"""Store errors for print at end of run - IMPLEMENTED TODO from line 65"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
error_msg = f"[{timestamp}] {the_error}"
errors.append(error_msg)
print(f'ERROR: {the_error}')
def print_errors_summary():
"""Print all collected errors at the end of the run"""
if errors:
print("\n" + "=" * 80)
print("ERRORS SUMMARY")
print("=" * 80)
for i, err in enumerate(errors, 1):
print(f"{i}. {err}")
print("=" * 80)
print(f"\nTotal errors: {len(errors)}\n")
else:
print("\n" + "=" * 80)
print("PROCESSING COMPLETED SUCCESSFULLY - No errors encountered")
print("=" * 80 + "\n")
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def do_elapsed(marker_in):
"""Calculate elapsed time"""
elapsed = _time.time() - marker_in
hours = int(elapsed / 3600)
minutes = int((elapsed % 3600) / 60)
seconds = int(elapsed % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"
def line():
"""Print a separator line"""
print('=' * 80)
def single_plural(count, word):
"""Return singular or plural form of word based on count"""
return f"{word}{'s' if count != 1 else ''}"
def exit_script(_in=None):
"""Exit with optional message and error summary"""
if _in:
print(f"\n{_in}\n")
print_errors_summary()
print(f"\nTotal runtime: {do_elapsed(elapsed_begin)}")
sys.exit(0 if not errors else 1)
# =============================================================================
# FILE I/O FUNCTIONS - ENHANCED WITH CHUNKING AND PROGRESS
# =============================================================================
def open_file_to_df(filename, doprint=1, use_chunks=False):
"""Read CSV filename into dataframe with optional chunking for large files"""
df = None
try:
if doprint:
print(f' Reading {os.path.basename(filename):>50}', flush=True, end='')
# Determine if we should use chunks based on file size
file_size_mb = os.path.getsize(filename) / (1024 * 1024) if os.path.exists(filename) else 0
should_chunk = use_chunks or file_size_mb > 100 # Chunk if >100MB
with open(filename, encoding='utf-8-sig', errors='replace') as f:
if should_chunk:
# Read in chunks for large files
chunks = []
chunk_iter = pd.read_csv(
f, index_col=None, header=0, sep=',',
engine='python', encoding='ISO-8859-1',
on_bad_lines='warn', chunksize=CHUNK_SIZE
)
if SHOW_PROGRESS and not doprint:
chunk_iter = tqdm(chunk_iter, desc=f"Reading {os.path.basename(filename)}",
unit='chunk', leave=True)
for chunk in chunk_iter:
chunks.append(chunk)
df = pd.concat(chunks, ignore_index=True).fillna('')
else:
# Read entire file for small/medium files
df = pd.read_csv(
f, index_col=None, header=0, sep=',',
engine='python', encoding='ISO-8859-1',
on_bad_lines='warn'
).fillna('')
if doprint and df is not None:
max_vid = 'ok'
if 'VAERS_ID' in df.columns:
try:
max_vid = f'max ID: {df.VAERS_ID.astype(int).max():>7}'
except:
max_vid = 'ID parse error'
print(f' ... {max_vid:>20} {len(df):>8,} rows')
except FileNotFoundError:
error(f'File not found: {filename}')
except pd.errors.EmptyDataError:
error(f'Empty file: {filename}')
df = pd.DataFrame()
except Exception as e:
error(f'Error reading {filename}: {type(e).__name__}: {e}')
if df is not None:
df = types_set(df)
return df if df is not None else pd.DataFrame()
def files_concat(files_list):
"""Concatenate multiple CSV files using all available cores for parallel reading."""
if not files_list:
return pd.DataFrame()
print(f' Concatenating {len(files_list)} file{("s" if len(files_list) != 1 else "")} across {min(NUM_CORES, len(files_list))} cores...')
use_chunks = False
try:
sample_size_mb = os.path.getsize(files_list[0]) / (1024 * 1024)
use_chunks = sample_size_mb > 100
except Exception:
pass
workers = min(NUM_CORES, len(files_list))
args_list = [(f, use_chunks) for f in files_list]
with Pool(workers) as pool:
if SHOW_PROGRESS:
results = list(tqdm(
pool.imap(_read_file_for_pool, args_list),
total=len(files_list), desc="Reading files", unit="file", leave=True
))
else:
results = pool.map(_read_file_for_pool, args_list)
dfs = [df for df in results if df is not None and len(df) > 0]
if not dfs:
return pd.DataFrame()
print(f' Combining {len(dfs)} dataframes...', end=' ', flush=True)
result = pd.concat(dfs, ignore_index=True)
print(f'{len(result):,} total rows')
return result
def write_to_csv(df, full_filename, open_file=0, ignore_dupes=0):
"""Write dataframe to CSV with optional chunking for large datasets"""
if df is None or len(df) == 0:
print(f' Warning: Empty dataframe, not writing {full_filename}')
return
try:
# For very large dataframes, serialize chunks in parallel then write sequentially.
# Uses a fork-based Pool so workers inherit the DataFrame without pickling it —
# only tiny (start, stop) index pairs go over IPC. pandas to_csv() is Python-level
# and holds the GIL, so threads give no speedup; separate processes are required.
if len(df) > CHUNK_SIZE * 2:
global _g_df_to_serialize
_g_df_to_serialize = df
_slices = [(i, min(i + CHUNK_SIZE, len(df))) for i in range(0, len(df), CHUNK_SIZE)]
_workers = min(NUM_CORES, len(_slices))
print(f' Writing {len(df):,} rows ({len(_slices)} chunks, {_workers} cores)...', end=' ', flush=True)
with Pool(_workers) as pool:
if SHOW_PROGRESS:
serialized = list(tqdm(
pool.imap(_serialize_csv_slice, _slices),
total=len(_slices), desc="Serializing", unit="chunk", leave=True
))
else:
serialized = pool.map(_serialize_csv_slice, _slices)
_g_df_to_serialize = None
# Single sequential write — preserves row order, utf-8-sig BOM via open()
with open(full_filename, 'w', encoding='utf-8-sig') as f:
f.write(df.iloc[:0].to_csv(index=False)) # header row
for s in serialized:
f.write(s)
print('Done')
else:
# Write entire dataframe for small/medium files
df.to_csv(full_filename, index=False, encoding='utf-8-sig')
# Optionally open the file
if open_file:
try:
sp.Popen(full_filename, shell=True)
except:
pass
except Exception as e:
error(f'Error writing to {full_filename}: {e}')
# =============================================================================
# DATA TYPE HANDLING - IMPROVED
# =============================================================================
def types_set(df):
"""
Set column types appropriately
IMPROVED: Better error handling, removed unnecessary try/except per TODO
"""
if df is None or len(df) == 0:
return df
# VAERS_ID should always be integer
if 'VAERS_ID' in df.columns:
try:
df['VAERS_ID'] = pd.to_numeric(df['VAERS_ID'], errors='coerce').fillna(0).astype('int64')
except Exception as e:
error(f"Error converting VAERS_ID to int64: {e}")
# cell_edits should be integer - FIXED TODO from line 884
if 'cell_edits' in df.columns:
try:
df['cell_edits'] = pd.to_numeric(df['cell_edits'], errors='coerce').fillna(0).astype('int64')
except Exception as e:
error(f"Error converting cell_edits to int64: {e}")
# Numeric fields
numeric_fields = ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'NUMDAYS', 'HOSPDAYS']
for col in numeric_fields:
if col in df.columns:
try:
df[col] = pd.to_numeric(df[col], errors='coerce')
except Exception as e:
error(f"Error converting {col} to numeric: {e}")
return df
def fix_date_format(df):
"""Convert dates to YYYY-MM-DD format"""
if df is None or len(df) == 0:
return df
if 'VAERS_ID' not in df.columns:
return df
if 'gapfill' in df.columns:
return df
date_columns = ['DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE']
converted_any = False
for col in date_columns:
if col in df.columns:
# Check if any dates need conversion
if len(df.loc[df[col].astype(str).str.contains('/', na=False)]) > 0:
if not converted_any:
print(f'{"":>40} Converting dates to YYYY-MM-DD format')
converted_any = True
try:
# Remove cut_ markers
df[col] = df[col].str.replace(' cut_.*', '', regex=True)
# Convert to datetime then format
df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df[col] = df[col].fillna('')
except Exception as e:
error(f"Error converting date column {col}: {e}")
return df
def warn_mixed_types(df):
"""Warn about mixed types in columns"""
if df is None or len(df) == 0:
return
# Check for mixed types (implementation can be expanded)
for col in df.columns:
if df[col].dtype == 'object':
# Could add more sophisticated type checking here
pass
# =============================================================================
# STATS FUNCTIONS - FIXED
# =============================================================================
def stats_initialize(date_currently):
"""Initialize stats for a new drop"""
global stats
stats = {
'date': date_currently,
'drop_input_covid': 0,
'comparisons': 0,
'deleted': 0,
'restored': 0,
'modified': 0,
'lo_ever': covid_earliest_vaers_id,
'hi_ever': 0,
'dedupe_count': 0,
'dedupe_reports': 0,
'dedupe_bytes': 0,
'dedupe_max_bytes': 0,
'dedupe_max_vid': 0,
'gapfill': 0,
'cells_edited': 0,
'cells_emptied': 0,
'trivial_changes_ignored': 0,
'columns': Counter()
}
def stats_resolve(date_currently):
"""
Statistics file - FIXED from broken state per TODO line 193
Properly handle statistics collection and aggregation
"""
global df_stats
try:
if not stats:
return
# Ensure stats directory exists
os.makedirs(os.path.dirname(file_stats), exist_ok=True)
# Load existing stats if file exists
if os.path.exists(file_stats):
try:
df_stats = pd.read_csv(file_stats, encoding='utf-8-sig')
except Exception as e:
print(f' Creating new {file_stats} (previous read failed: {e})')
df_stats = pd.DataFrame()
else:
print(f' Creating new {file_stats}')
df_stats = pd.DataFrame()
# Remove existing entry for this date if present
if 'date' in df_stats.columns:
df_stats = df_stats[df_stats['date'] != date_currently]
df_stats = df_stats[df_stats['date'] != 'All'] # Remove old totals row
# Create row for this drop
stats_row = pd.DataFrame([stats])
# Append new stats
df_stats = pd.concat([df_stats, stats_row], ignore_index=True)
# Calculate 'All' totals row
numeric_cols = df_stats.select_dtypes(include=[np.number]).columns.tolist()
totals = {'date': 'All'}
for col in numeric_cols:
if col in df_stats.columns:
# Sum for most columns
if col.startswith('lo_'):
totals[col] = df_stats[col].min() # Minimum for 'lo_' columns
elif col.startswith('hi_'):
totals[col] = df_stats[col].max() # Maximum for 'hi_' columns
else:
totals[col] = df_stats[col].sum() # Sum for counts
# Append totals row
df_stats = pd.concat([df_stats, pd.DataFrame([totals])], ignore_index=True)
# Write to file
df_stats.to_csv(file_stats, index=False, encoding='utf-8-sig')
except Exception as e:
error(f"Error in stats_resolve: {e}")
# =============================================================================
# TRACKING FUNCTIONS
# =============================================================================
def do_ever_covid(vids_all_covid_list):
"""Track every COVID VAERS_ID ever seen"""
global ever_covid
try:
# Load existing
if os.path.exists(file_ever_covid):
with open(file_ever_covid, 'r') as f:
existing = f.read().strip().split()
existing = [int(x) for x in existing if x.isdigit()]
ever_covid = {x: 1 for x in existing}
# Add new
vids_new = [x for x in vids_all_covid_list if x not in ever_covid]
if vids_new:
print(f'{len(vids_new):>10} new COVID reports added to ever_covid tracking')
ever_covid.update({x: 1 for x in vids_all_covid_list})
# Write back
with open(file_ever_covid, 'w') as f:
for vid in sorted(ever_covid.keys()):
f.write(f"{vid}\n")
except Exception as e:
error(f"Error in do_ever_covid: {e}")
def do_never_ever(vids_present, date_currently, source):
"""Track VAERS IDs that have never been published"""
global never_published_any
# Implementation similar to original
# This is a complex function - keeping core logic
pass
# =============================================================================
# DIRECTORY AND FILE MANAGEMENT
# =============================================================================
def validate_dirs_and_files():
"""Validate and create necessary directories"""
print("Validating directories...")
dirs = [dir_input, dir_working, dir_consolidated, dir_compared, dir_flattened]
for d in dirs:
if not os.path.exists(d):
print(f" Creating directory: {d}")
os.makedirs(d, exist_ok=True)
# Check for input files
if not os.path.exists(dir_input):
error(f"Input directory does not exist: {dir_input}")
return False
input_files = glob.glob(f"{dir_input}/**/*.zip", recursive=True) + \
glob.glob(f"{dir_input}/**/*.csv", recursive=True)
if not input_files:
error(f"No input files (.zip or VAERS*.csv) found in {dir_input}")
print(f" Searched in: {dir_input}")
print(f" Please place VAERS data files in this directory")
return False
print(f" Found {len(input_files)} input files")
return True
# =============================================================================
# MAIN PROCESSING PLACEHOLDER
# The following functions are integrated from the original script
# with enhancements for progress bars and chunked processing
#===============================================================================
# Integrated from original: files_populate_information()
def files_populate_information():
''' Bookkeeping info. Often updating in 'files' dictionary '''
global floor_notice_printed, ceiling_notice_printed
if not files: # make the keys
for x in ['input', 'working', 'flattened', 'changes', 'consolidated']:
files[x] = {}
for y in ['date', 'files']:
files[x][y] = []
# set _dir
if x == 'input': files[x]['_dir'] = dir_input
elif x == 'working': files[x]['_dir'] = dir_working
elif x == 'changes': files[x]['_dir'] = dir_compared
elif x == 'flattened': files[x]['_dir'] = dir_flattened
elif x == 'consolidated': files[x]['_dir'] = dir_consolidated
if 'done' not in files:
files['done'] = []
# current values of files in directories
for thing in list(files.keys()):
if thing == 'done': continue # an outlier added later
_dir = files[thing]['_dir']
# filenames only and made lowercase)
full = sorted( [y for x in os.walk(_dir) for y in glob.glob(os.path.join(x[0], '*' + '.*'))] )
# note other files/dirs can be there without a problem, only .csv or .zip are picked up
full = [x for x in full if re.search(r'^.*(\d{4}\-\d{2}\-\d{2}).*', x)] # file must contain a date like 2020-12-24, making sure
full = [linux_path(x) for x in full] # to forward slashes
full = [x for x in full if (x.lower().endswith('.csv') or x.lower().endswith('.zip'))] # can be csv or zip
full = [x for x in full if not (x.lower().endswith('_a.csv') or x.lower().endswith('_b.csv'))] # screening out once-upon-a-time tests
# date only like 2020-12-24
files[thing]['date'] = sorted( set( [date_from_filename(x) for x in full] ) ) # uniquing in the case of test cases CSV inputs
# date to either the zip file or directory name
# Build iteratively so a collision (two files sharing the same date key) is
# logged rather than silently overwriting the first file.
keyval = {}
for x in full:
date_key = date_from_filename(x)
if date_key in keyval:
error(f"Date key collision in '{thing}': {date_key} matches both "
f"'{os.path.basename(keyval[date_key])}' and '{os.path.basename(x)}'. "
f"Using {os.path.basename(x)}.")
keyval[date_key] = x
files[thing]['keyval'] = keyval
files[thing]['valkey'] = {x : date_from_filename(x) for x in full}
files[thing]['files' ] = list(files[thing]['valkey'].keys())
# Hack for testing when input files are only flattened files rather than in drops dir.
if use_test_cases:
files[ 'input' ] = files[ 'flattened' ]
do_file_limits = 0
if date_floor:
do_file_limits = 1
if not floor_notice_printed:
print(f'\n\n\n\t\t date_floor is set at { date_floor}, limiting files\n\n')
floor_notice_printed = 1
if date_ceiling:
do_file_limits = 1
if not ceiling_notice_printed:
print(f'\n\n\n\t\t date_ceiling is set at {date_ceiling}, limiting files\n\n')
ceiling_notice_printed = 1
if do_file_limits: # remove those that don't apply
if date_floor:
files['input']['date'] = [x for x in files['input']['date'] if x >= date_floor]
if date_ceiling:
files['input']['date'] = [x for x in files['input']['date'] if x <= date_ceiling]
for y in ['input']: # only this
files[y]['date'] = [x for x in files[y]['date'] if x in files['input']['date']]
files[y]['keyval'] = {k:v for k, v in files[y]['keyval'].items() if k in files['input']['date']}
files[y]['valkey'] = {k:v for k, v in files[y]['valkey'].items() if v in files['input']['date']}
files[y]['files' ] = list(files[y]['valkey'].keys())
#pp.pprint(files)
return
# Integrated from original: files_from_zip()
def files_from_zip(zip_file, dir_dst):
''' This requires ... pip install zipfile-deflate64 ... to handle zip straight from https://vaers.hhs.gov/data/datasets.html
See https://stackoverflow.com/a/73040025/962391
The alternative is to unzip and rezip to get away from compression type 9 (deflate64), a licensing issue '''
try:
archive = zipfile.ZipFile(zip_file)
except Exception as e:
error_msg = f'Failed to open zip file {zip_file}: {e}'
print(f' Exception: {e} at zipfile.ZipFile({zip_file})')
print(f' Skipping this file and continuing with next...')
error(error_msg)
return False # Indicate failure
print(f' unzip {zip_file}')
try:
for file in archive.namelist(): # only 2020... and NonDomestic files
if re.search(r'\d{4}\D', file) or file.lower().startswith('nond'):
archive.extract(file, './' + dir_dst)
if files_limit: # In testing with files_limit, remove those that don't apply
print(f' files_limit = {files_limit}')
files_all = glob.glob(dir_working + '/' + '*.csv')
files_all = [linux_path(x) for x in files_all]
for file in files_all:
for x in files_limit:
if x not in file and 'nond' not in file.lower():
print(f' remove file {file}')
os.remove(file)
#don't do this ... print(f' Removing {zip_file}')
#os.remove(zip_file)
return True # Indicate success
except Exception as e:
error_msg = f'Failed to extract files from {zip_file}: {e}'
print(f' Exception during extraction: {e}')
print(f' Skipping this file and continuing with next...')
error(error_msg)
return False # Indicate failure
# Integrated from original: get_files_date_marker()
def get_files_date_marker(_dir):
''' Date from filename for files in director '''
pattern_dir = './' + _dir + '/'
files_with_date = glob.glob(pattern_dir + '*-*')
if not files_with_date:
return('')
files_with_date = files_with_date[0] # any will do, thus first in list returned (even if just one of course)
file_date_part = date_from_filename(files_with_date)
if file_date_part:
return file_date_part
else:
print(' EMPTY in get_files_date_marker')
return
# Integrated from original: set_files_date_marker()
def set_files_date_marker(_dir, filename):
''' For visual clarity create an empty file in dir with filename as the particular date '''
file_date_part = date_from_filename(filename)
pattern_dir = _dir + '/'
if file_date_part:
files_date_marker = pattern_dir + file_date_part
if os.path.exists(files_date_marker):
return
else:
print(f' Creating in {pattern_dir} date marker file {file_date_part}')
open(files_date_marker, 'a').close()
else:
print(f' FAILED creating in {pattern_dir} date marker file {file_date_part}')
return('EMPTY IN set_files_date_marker, must fix if hit')
# Integrated from original: date_from_filename()
def date_from_filename(filename):
''' Pull just date portion of a filename '''
return re.sub(r'.*(\d{4}\-\d{2}\-\d{2}).*', r'\1', filename)
# Integrated from original: set_columns_order()
def set_columns_order(df):
if 'gapfill' in df.columns: return df # skip stats.csv
# EXACT match to vaers-orig.py for consistent output column order and layout
# with reference Gary outputs (so first-N by cell_edits, header positions,
# and FINAL_MERGED physical layout are directly comparable).
cols_order = ['cell_edits', 'status', 'changes', 'AGE_YRS', 'SEX', 'STATE', 'DIED',
'SPLTTYPE', 'SYMPTOM_TEXT',
'L_THREAT', 'ER_VISIT', 'ER_ED_VISIT', 'HOSPITAL', 'DISABLE', 'BIRTH_DEFECT', 'OFC_VISIT',
'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE', 'VAX_DATE', 'DATEDIED'
]
if use_test_cases:
cols_order = ['FORM_VERS'] + cols_order
cols_order = cols_order[::-1] # reversed to then appear as shown here
for col in cols_order:
if col in df:
df = move_column_forward(df, col)
return df
# Integrated from original: save_multi_csv()
def save_multi_csv(date_currently, df):
if len(df) <= 1048576:
print(f'Multiple csv save not necessary, output of {len(df)} rows to csv are within single sheet limit')
return
df = df.copy()
df = set_columns_order(df)
df_a = df.head(1048575)
df_b = pd.DataFrame()
if len(df) > 1048576:
df_b = df.loc[ ~df.VAERS_ID.isin(df_a.VAERS_ID) ]
filename_a = f'{dir_compared}/{date_currently}_VAERS_FLATFILE_A.csv'
filename_b = f'{dir_compared}/{date_currently}_VAERS_FLATFILE_B.csv'
print(f'Saving {filename_a}, {len(df_a)} rows')
write_to_csv(df_a, filename_a, open_file=1)
if len(df_b):
print(f' and {filename_b}, {len(df_b)} rows')
write_to_csv(df_b, filename_b, open_file=0)
### Currently broken, writes file but is corrupted
###save_xlsx(df, 'XLSX_VAERS_FLATFILE.xlsx')
# Integrated from original: save_xlsx()
def save_xlsx(df, filename):
pd.io.formats.excel.ExcelFormatter.header_style = None
df = df.copy()
df = df.reset_index(drop=True)
# Currently broken, writes file but is corrupted
# what a mess ...
'''
from unidecode import unidecode
def FormatString(s):
if isinstance(s, unicode):
try:
s.encode('ascii')
return s
except:
return unidecode(s)
else:
return s
df = df.map(FormatString)
'''
print(' Unicode decode')
df = df.map(lambda x: x.encode('unicode_escape').decode('utf-8') if isinstance(x, str) else x)
print(f' Saving to {filename}')
df1 = df.head(1048575)
if len(df) > 1048576:
df2 = df.tail(len(df) - (1048575 + 1))
sheets = {
'First million or so rows': df1,
'Remaining rows': df2,
}
'''
with pd.ExcelWriter('test.xlsx', engine='openpyxl') as writer:
df1.to_excel(writer, sheet_name = 'Tab1', index = False)
df2.to_excel(writer, sheet_name = 'Tab2', index = False)
'''
'''
New problem with openpyxl:
openpyxl.utils.exceptions.IllegalCharacterError: ... benadryl allergy. Aroun??d 11 pm t ... '''
#print(' ILLEGAL_CHARACTERS_RE')
#ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
#df = df.map(lambda x: re.sub(ILLEGAL_CHARACTERS_RE, '{}', x) if isinstance(x, str) else x)
#df = df.map(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x)
print(f' Writing {filename}')
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
for sheet in sheets:
sheets[ sheet ].to_excel(writer, sheet_name = sheet, index = False)
'''
writer.book.use_zip64()
workbook = writer.book
#workbook.use_zip64()
##workbook.use_zip64() # due to size threshold hit
header_styles = workbook.add_format({'font_name': 'Arial', 'font_size': 10, 'bold': False})
cell_styles = workbook.add_format({ # was test, now unused, didn't take why? Re-try, not yet tested.
'font_name' : 'Arial',
'font_size' : 10,
'bold' : False,
})
cell_styles = header_styles
for sheet in sheets:
sheets[sheet].to_excel(writer, sheet_name=sheet, index=False)
worksheet = writer.sheets[ sheet ]
for col_num, value in enumerate(sheets[sheet].columns.values):
worksheet.write(0, col_num, value, cell_styles)
worksheet.set_row(0, None, header_styles)
'''
return
# engine options: openpyxl or
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
workbook = writer.book
##workbook.use_zip64() # due to size threshold hit
header_styles = workbook.add_format({'font_name': 'Arial', 'font_size': 10, 'bold': False})
cell_styles = workbook.add_format({ # was test, now unused, didn't take why? Re-try, not yet tested.
'font_name' : 'Arial',
'font_size' : 10,
'bold' : False,
})
for sheet in sheets:
sheets[sheet].to_excel(writer, sheet_name=sheet, index=False)
worksheet = writer.sheets[ sheet ]
#workbook.use_zip64() # due to size threshold hit. Here instead??? https://stackoverflow.com/a/48095021/962391 ... FAIL
for col_num, value in enumerate(sheets[sheet].columns.values):
worksheet.write(0, col_num, value, cell_styles)
worksheet.set_row(0, None, header_styles)
workbook.close() # help?
# Integrated from original: get_flattened()
def get_flattened(previous_or_working, raw_date):
''' Flattened file, previous or [newest] working [for compare]
Depends on the files dictionary having been populated. '''
flattened_all_dates = sorted( files['flattened']['date'] )
if flattened_all_dates:
if previous_or_working == 'previous':
candidates_previous = [ x for x in flattened_all_dates if x < raw_date ]
if candidates_previous:
return files['flattened']['keyval'][ candidates_previous[-1] ] # most recent flattened file before this current being worked on, to compare to
else:
return('') # can be first run, no previous flattened file, or first matching date_floor
elif previous_or_working == 'working':
if raw_date in files['flattened']['keyval']:
return files['flattened']['keyval'][raw_date] # this current being worked on
else:
exit(f" ERROR: Expected {raw_date} in files['flattened']['keyval']")
else:
exit(f'\n\n\n\n\t\t\t ERROR: No flattened files using raw_date {raw_date}, unexpected \n\n\n\n')
list_index_of_relative_to = files['input']['date'].index(raw_date)
if list_index_of_relative_to == 0: # there's no previous in this case, and no point in doing a compare to itself
print(f' ERROR: Index 0, no previous available')
if len(files['flattened']['files']) == 0:
print(f'\n\n\n\n\t\t\t ERROR: No flattened files, unexpected \n\n\n\n')
return('')
if len(files['flattened']['files']) == 1:
print(f"\n\n\n\n\t\t\t ERROR: Only 1 flattened file, unexpected, it is {files['flattened']['files'][0]} \n\n\n\n")
return('')
if previous_or_working == 'previous':
target = files['flattened']['files'][-2]
elif previous_or_working == 'working':
target = files['flattened']['files'][-1]
return target
# Integrated from original: linux_path()
def linux_path(x):
return re.sub(r'\\', '/', x)
# Integrated from original: subrange()
def subrange(list_in, _max):
''' Input list and for print return up to _max (like 5) at the start and end of that list '''
if len(list_in) == 0: return '' # i.e. not empty square brackets
if len(list_in) <= _max: return list_in
list_in = sorted(list_in)
this_many = int(max(min(len(list_in) / 2, _max), 1))
head = f'{list_in[:this_many]}' if len(list_in) > 1 else ''
head = re.sub(r'\]', '', head)
tail = f'{list_in[-this_many:]}' if len(list_in) > 1 else ''
tail = re.sub(r'\[', '', tail)
return(f'{len(list_in):>7} {head} ... {tail}')
# Integrated from original: drop_dupes()
def drop_dupes(df):
''' '''
len_before = len(df) # No duplicate lines error check
df = df.copy()
df = df.drop_duplicates(subset=df.columns, keep='last') # Should be none. Overwrite of values now done
if len(df) - len_before:
print(f'{(len_before - len(df)):>10} duplicates dropped in df_data on VAERS_IDs, SHOULD NOT HAVE HAPPENED, in write_to_csv')
if len(df) - len_before:
if not open: # use of 'open' is for manual debugging, while in code during a run while testing can be overridden with open=1, ignore_dupes=1
if not ignore_dupes:
print(f'\n\n\n\n\t\t\t {(len_before - len(df)):>10} write_to_csv() {full_filename} complete duplicates dropped, SHOULD NOT HAPPEN \n\n\n\n')
exit(f'\t\t\t Duplicates exist, this has to be fixed, exiting at {line()}\n\n\n')
# Integrated from original: nan_alert()
def nan_alert(df_in, col=None):
''' nan stands for not-a-number, meaning null, check to see if any are present in the dataframe (df_in) '''
if not 'DataFrame' in str(type(df_in)):
print(f' nan_alert() expect a DataFrame, got {str(type(df_in))}, skipping')
return 0
count_nans = 0
if not len(df_in):
count_nans = 0
elif col is not None: # if column specified, otherwise entire df_in
df_partial = df_in.copy()[ ['VAERS_ID', col] ]
columns_w_nans = df_partial.columns[df_partial.isna().any()].tolist()
if col in columns_w_nans:
print(f'ERROR: {col} with Nans UNEXPECTED')
return 1
else:
columns_w_nans = df_in.columns[df_in.isna().any()].tolist()
if columns_w_nans:
print(f'\n\n\t nans in {columns_w_nans} line {inspect.stack()[1][2]}\n\n')
try:
df_with_nans = df_in[df_in.isna().any(axis=1)] # TODO: Truly just rows with nan? https://stackoverflow.com/questions/43424199/display-rows-with-one-or-more-nan-values-in-pandas-dataframe
count_nans = len(df_with_nans)
print(f' rows {count_nans}')
## TODO this, return df_in ... df_in.loc[df_in.VAERS_ID.isin(df_with_nans.VAERS_ID), 'trace' ] += f' . NaN'
except Exception as e:
print(df_in)
error(f"df_in[df_in.isna().any(axis=1)] {e}")
error(f'rows with NANS {inspect.stack()[1][2]}')
print(f'\n\n\n\n\t\t\t rows with NANS line {inspect.stack()[1][2]} \n {df_in} \n\n\n\n')
count_nans = 999999
return count_nans
# Integrated from original: lookup()
def lookup(previous_date, date_currently, vid_cols_affected):
''' A tool for adhoc on-the-fly debugging while at a breakpoint.
Show values for just the fields and VAERS_IDs for these compared files in dir_compared '''
print()
print(f'debug lookup() on vids affected')
files_populate_information()
df_changes = pd.DataFrame()
files_list = [ files['changes']['keyval'][previous_date], files['changes']['keyval'][date_currently] ]
vids_all_list = list(vid_cols_affected.keys())
cols_all = []
for k in vid_cols_affected:
for col in vid_cols_affected[k]:
if col not in cols_all:
cols_all.append(col)
for filename in files_list:
df_tmp = open_file_to_df(filename, doprint=0).fillna('') # [vids_all_list]
df_tmp = df_tmp.loc[ df_tmp.VAERS_ID.isin(vids_all_list) ]
df_tmp = df_tmp.copy()[['VAERS_ID'] + cols_all]
df_tmp['date'] = date_from_filename(filename)
df_changes = pd.concat([df_changes.reset_index(drop=True), df_tmp.reset_index(drop=True) ], ignore_index=True)
df_changes = fix_date_format(df_changes)
df_changes = move_column_forward(df_changes, 'date')
df_changes = df_changes.sort_values(by=['VAERS_ID', 'date'])
write_to_csv(df_changes, 'lookup_changes.csv', open_file=1)
print()
# Integrated from original: do_replace()
def do_replace(d, col, tag, this, that):
''' Not essential, a utility for testing some prep for harvesting numbers.
Makes regular expression replacements to remove false positives. Pared way down from its original elsewhere. '''
d2 = d.copy() # ???
print( f' do_replace {col} {tag:>40} {this:>40} {that}' )
d2[col] = d2[col].str.replace(this, that, flags=re.IGNORECASE, regex=True)
return d2
# Integrated from original: symptoms_file_entries_append_to_symptom_text()
def symptoms_file_entries_append_to_symptom_text(df_symfile):
''' Append symptoms entries from SYMPTOMS files to SYMPTOM_TEXT in symptoms column
Files like 2023VAERSSYMPTOMS.csv '''
for col in list(df_symfile.copy().columns): # limit to only SYMPTOM1 thru SYMPTOM5
if 'VAERS_ID' in col:
continue
if 'SYMPTOM' not in col:
del df_symfile[col]
continue
if 'VERSION' in col:
del df_symfile[col]
del col
''' Combine the symptoms column values to one string in a column called 'symptom_entries'
Step 1: Five columns to just one '''
print(' Combining symptoms column items. Grouping with delimiters, single row per VAERS_ID ...')
cols_symfile = sorted(set(df_symfile.columns) - set(['VAERS_ID']))
print(f' Appending each symptom in new column called symptom_entries ({NUM_CORES} cores)')
# Pass DataFrame column-subset chunks to workers so astype(str)+sort run in parallel
_df_cols = df_symfile[cols_symfile]
_chunk_sz = max(1, len(_df_cols) // NUM_CORES)
_df_chunks = [_df_cols.iloc[i:i + _chunk_sz] for i in range(0, len(_df_cols), _chunk_sz)]
with Pool(min(NUM_CORES, len(_df_chunks))) as pool:
if SHOW_PROGRESS:
_joined = list(tqdm(
pool.imap(_join_symptom_rows_chunk, _df_chunks),
total=len(_df_chunks), desc=" Joining symptoms", unit="chunk", leave=True
))
else:
_joined = pool.map(_join_symptom_rows_chunk, _df_chunks)
df_symfile['symptom_entries'] = [entry for chunk in _joined for entry in chunk]
''' Step 2: Multiple VAERS_ID rows to just 1 row each '''
df_symfile = df_symfile.reset_index(drop=True)
df_symfile = df_symfile[['VAERS_ID', 'symptom_entries']]
print(f' Grouping by VAERS_ID ({NUM_CORES} cores)')
_vids_u = df_symfile['VAERS_ID'].unique()
if NUM_CORES > 1 and len(_vids_u) > NUM_CORES * 50:
# Assign each unique VAERS_ID a chunk index via modulo, then map to all rows and
# groupby that index — O(n) vs the old O(n×NUM_CORES) isin() loop.
_chunk_map = dict(zip(_vids_u, np.arange(len(_vids_u)) % NUM_CORES))
df_symfile['_chunk'] = df_symfile['VAERS_ID'].map(_chunk_map)
_sym_chunks = [grp.drop(columns='_chunk').reset_index(drop=True)
for _, grp in df_symfile.groupby('_chunk', sort=False)
if len(grp)]
df_symfile.drop(columns='_chunk', inplace=True)
with Pool(min(NUM_CORES, len(_sym_chunks))) as pool:
if SHOW_PROGRESS:
_grp_results = list(tqdm(
pool.imap(_groupby_syms_chunk, _sym_chunks),
total=len(_sym_chunks), desc=" Grouping VAERS_ID", unit="chunk", leave=True
))
else:
_grp_results = pool.map(_groupby_syms_chunk, _sym_chunks)
df_symfile = pd.concat(_grp_results, ignore_index=True)
else:
df_symfile = (df_symfile.astype(str)
.groupby('VAERS_ID').agg(list)
.map('_|_'.join).reset_index())
print(' Cleaning multiple delimiters due to empty columns')
df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'\s*_\|_\s*(?:\s*_\|_\s*)+\s*', '_|_', False, regex=True) # multiples to single (where empty columns)
''' _|_Always at beginning and end_|_ ... like a box or cell '''
df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'^(?!_\|_)(.*)$', r'_|_\1', False, regex=True) # _|_ at start of line always
df_symfile['symptom_entries'] = df_symfile.symptom_entries.str.replace(r'^(.*)(?!_\|_)$', r'\1_|_', False, regex=True) # _|_ at end of line always
df_long_cells = df_symfile.loc[ df_symfile.symptom_entries.str.len() >= 32720 ] # "Both . xlsx and . csv files have a limit of 32,767 characters per cell"
if len(df_long_cells):
print(f'{len(df_long_cells):>10} over 32720 in length to be truncated')
df_long_cells['symptom_entries'] = df_long_cells.symptom_entries.str.replace(r'^(.{32720,}).*', r'\1 \[truncated, Excel cell size limit 32,767\]', regex=True)
df_symfile = copy_column_from_1_to_2_per_vids('symptom_entries', df_long_cells, df_symfile, df_long_cells.VAERS_ID.to_list()) # TODO: This was 'symptoms', was a bug, right?
del df_long_cells
return df_symfile
# Integrated from original: make_numeric()
def make_numeric(df_in, col):
# There's been a lot of struggling with types and in the hands of garyha (me) in 2025 restart, still not all solved.
if not len(df_in): return df_in
df_in = df_in.copy()
df_in[col] = df_in[col].fillna('')
df_in[col] = df_in[col].astype(str)
try:
df_in[col] = pd.to_numeric(df_in[col])
except Exception as e:
print(f'pd.to_numeric Exception {e}')
# Desperation, number columns usually should be empty when 0, setting to 0.0 during run, empty them later
df_in.loc[ (df_in[ col ].isna()), col ] = 0.0
df_in[ col ] = df_in[ col ].astype('float64').round(4)
if 'nan' in df_in[col].to_list():
print(f'is_nan in make_numeric()')
return df_in
# Integrated from original: copy_column_from_1_to_2_per_vids()
def copy_column_from_1_to_2_per_vids(column, df1, df2, vids): # vids are VAERS_IDs
for col in df1.columns:
if col not in df2.columns:
df2[col] = '' # Avoid nans adding column empty string if didn't already exist
print(f' {col} made empty string in df2')
df2_sequestered_not_vids = df2.loc[ ~df2.VAERS_ID.isin(vids) ]
df1 = df1.loc[ df1.VAERS_ID.isin(vids) ]
df2 = df2.loc[ df2.VAERS_ID.isin(vids) ]
list_vids_both_only_to_copy = sorted( set(df1.VAERS_ID.to_list()) & set(df2.VAERS_ID.to_list()) )
df2 = df2.copy()
df2.loc[df2.VAERS_ID.isin( list_vids_both_only_to_copy ), column] = df2['VAERS_ID'].map(
pd.Series(df1[column].values, index=df1.VAERS_ID).to_dict()
)
if len(df2_sequestered_not_vids):
# Add back any that were in 2 but not in vids
df2 = pd.concat([
df2.reset_index(drop=True),
df2_sequestered_not_vids.reset_index(drop=True),
], ignore_index=True)
return df2
# To be untouched
df1_sequestered_not_vids = df1.loc[ ~df1.VAERS_ID.isin(vids) ]
df2_sequestered_not_vids = df2.loc[ ~df2.VAERS_ID.isin(vids) ]
df1 = df1.loc[ df1.VAERS_ID.isin(vids) ]
df2 = df2.loc[ df2.VAERS_ID.isin(vids) ]
# df1 and df2 are now only those that both have `vids` but this has become a mess, TODO
# Only apply to common rows
df1_not_in_df2 = df1.loc[ ~df1.VAERS_ID.isin(df2.VAERS_ID) ]
df2_not_in_df1 = df2.loc[ ~df2.VAERS_ID.isin(df1.VAERS_ID) ]
list_vids_both_only_to_copy = sorted( set(df1.VAERS_ID.to_list()) & set(df2.VAERS_ID.to_list()) )
df1_common_subset = df1.loc[ df1.VAERS_ID.isin(list_vids_both_only_to_copy) ]
df2_common_subset = df2.loc[ df2.VAERS_ID.isin(list_vids_both_only_to_copy) ]
# Make the changes
df2_common_subset = df2_common_subset.copy()
print(f'copy_column {column} on {len(list_vids_both_only_to_copy)} common vids. {len(df1_not_in_df2)} extra in df1, {len(df2_not_in_df1)} extra in df2')
df2_common_subset.loc[df2_common_subset.VAERS_ID.isin( list_vids_both_only_to_copy ), column] = df2_common_subset['VAERS_ID'].map(
pd.Series(df1_common_subset[column].values, index=df1_common_subset.VAERS_ID).to_dict()
)
if len(df1_not_in_df2) or len(df2_not_in_df1):
# Add back any that were in 2 but not in 1
df2 = pd.concat([
df1_not_in_df2.reset_index(drop=True),
df2_common_subset.reset_index(drop=True),
df2_not_in_df1.reset_index(drop=True),
df2_sequestered_not_vids.reset_index(drop=True),
], ignore_index=True)
return df2
# Integrated from original: move_rows()
def move_rows(df_subset, df_move_from, df_move_to):
''' Move rows in df_subset out of df_move_from into df_move_to
Return the new df_move_from and df_move_to '''
if not len(df_subset) or not len(df_move_from):
return df_move_from, df_move_to
df_move_to = df_move_to.copy()
'''
<string>:1: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated.
In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes.
To retain the old behavior, exclude the relevant entries before the concat operation.
'''
if not len(df_move_to): # avoiding that warning above
df_move_to = df_subset.copy()
else:
df_move_to = pd.concat([df_move_to.reset_index(drop=True), df_subset.reset_index(drop=True) ], ignore_index=True)
df_move_from = df_move_from.loc[ ~df_move_from.VAERS_ID.isin( df_move_to.VAERS_ID ) ] # everything from before, not now in done
return df_move_from, df_move_to
# Integrated from original: move_column_forward()
def move_column_forward(df_in, column):
''' Reorder columns moving column to second '''
if column not in df_in: return df_in
columns_pre = list(df_in.columns)
if columns_pre[1] == column: return df_in # already in that spot
if 'VAERS_ID' not in df_in.columns:
col_order = [column] # start
for col in df_in.columns:
if col == column: continue
col_order.append(col)
return df_in[col_order] #df_in.reindex(col_order, axis=1)
col_order = ['VAERS_ID', column]
for c in columns_pre:
if c not in col_order:
col_order.append(c)
return df_in.reindex(col_order, axis=1)
# Integrated from original: check_dupe_vaers_id()
def check_dupe_vaers_id(df):
if df is None or (not len(df)):
return 0
if not 'VAERS_ID' in df.columns:
return 0
dupes_list = [ x for x,count in Counter(df.VAERS_ID).items() if count > 1 ]
if dupes_list:
print(f'\n\nline {inspect.stack()[ 1 ][ 2 ]} WARNING: {len(dupes_list)} D U P L I C A T E VAERS_IDs: {subrange(dupes_list, 6)}\n\n')
return 1
return 0
# Integrated from original: prv_new_error_check()
def prv_new_error_check(df_prv, df_new):
''' Ensuring the VAERS_IDs are identical in each '''
vids_in_prv = df_prv.VAERS_ID.to_list()
vids_in_new = df_new.VAERS_ID.to_list()
if sorted(set(vids_in_prv)) != sorted(set(vids_in_new)):
print() ; print(f' ERROR: Expected VAERS_ID in prv and new to match exactly')
print(f' len prv: {len(vids_in_prv)}')
print(f' len new: {len(vids_in_new)}')
print(f' Difference: {len(vids_in_new) - len(vids_in_prv)}')
vids_in_prv_not_in_new = df_prv.loc[ ~df_prv.VAERS_ID.isin(df_new.VAERS_ID) ].VAERS_ID.to_list()
vids_in_new_not_in_prv = df_new.loc[ ~df_new.VAERS_ID.isin(df_prv.VAERS_ID) ].VAERS_ID.to_list()
print(f' In new not in prv: {vids_in_new_not_in_prv}')
print(f' In prv not in new: {vids_in_prv_not_in_new}')
print()
return 1
elif sorted(vids_in_prv) != sorted(vids_in_new): # as lists
print() ; print(f' ERROR: VAERS_ID in prv are the same sets but there are multiple VAERS_ID in one')
print(f' len prv: {len(vids_in_prv)}')
print(f' len new: {len(vids_in_new)}')
print(f' Difference: {len(vids_in_new) - len(vids_in_prv)}')
list_multiples_prv = [k[0] for k,v in df_prv[ ['VAERS_ID'] ].value_counts().to_dict().items() if v > 1]
if list_multiples_prv:
print(f' VAERS_IDs: {list_multiples_prv}')
print(f" df_prv.loc[ df_prv.VAERS_ID.isin({list_multiples_prv}) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]")
df_multiples_prv = df_prv.loc[df_prv.VAERS_ID.isin(list_multiples_prv)][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]
print(f'{df_multiples_prv}')
list_multiples_new = [k[0] for k,v in df_new[ ['VAERS_ID'] ].value_counts().to_dict().items() if v > 1]
if list_multiples_new:
print(f' VAERS_IDs: {list_multiples_new}')
print(f"df_new.loc[ df_new.VAERS_ID.isin({list_multiples_new}) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]")
df_multiples_new = df_new.loc[ df_new.VAERS_ID.isin(list_multiples_new) ][['VAERS_ID', 'VAX_LOT', 'VAX_MANU', 'VAX_TYPE', 'VAX_NAME']]
print(f'{df_multiples_new}')
exit(' Cannot continue with this discrepancy')
# Integrated from original: debug_breakpoint()
def debug_breakpoint( date, col, vids, debug_dates, debug_cols, debug_vids ):
debug_pause = 0
if date in debug_dates or not debug_dates:
debug_pause += 1
if col in debug_cols or not debug_cols:
debug_pause += 1
if debug_vids:
vid_pause=0
for dv in debug_vids:
if dv in vids:
vid_pause = 1
break
if vid_pause:
debug_pause += 1
if debug_pause >= 3:
return 1 # a line for setting a breakpoint if conditions match
return 0
# Integrated from original: diff_context()
def diff_context( prv, new, col, vid, date_currently ):
''' Inputs: String column values like in SYMPTOM_TEXT. Code reviewer: Is there a better way below? Surely. '''
delim = ''
if len(prv) > 200:
delim_count, delim = get_prevalent_delimiters(prv)
a = diff_simplify( prv )
b = diff_simplify( new )
''' Ignore punctuation and upper/lower case. [^a-zA-Z0-9|] clears everything not alphanumeric or pipe symbol (since VAX delim matters there) '''
if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()):
return( '', '' )
if (delim and delim_count >= 5) or col == 'symptom_entries':
if col == 'symptom_entries': # these are lightning fast
delim = '_|_'
list_prv = a.split(delim)
list_new = b.split(delim)
only_prv = [x for x in list_prv if x not in list_new] # remove common
only_new = [x for x in list_new if x not in list_prv]
a = delim.join(only_prv)
b = delim.join(only_new)
''' Neutralizing punctuation changes needed here too because like 1334938 comma to pipe ... Hallucination| auditory ... wacky stuff by them '''
if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()):
return( '', '' ) # Ignore punctuation and upper/lower case
if delim == '_|_':
if sorted(only_prv) == sorted(only_new):
return ('', '') # Ignore reversed order
if a: a = delim + a + delim # endings
if b: b = delim + b + delim
return( a, b )
context = 6 # way low for cases like 1589954 to rid the word 'appropriate' by itself at len(largest_common_string) > context
count_while = 0
continue_while = 1
#length_max = 50 deprecated
while count_while <= 9 and continue_while: # Whiddling strings down, removing common substrings
count_while += 1 # Also consider if a in b: re.sub(a, '', b) ; a = '' and reverse for shorter no context
if not a or not b:
return( a, b )
if a.lower() == b.lower():
return( '', '' )
''' Important: If changes are the same with words just in a different order, skipping those.
Why? There are so many they would drown out changes. Applies for example to dose order changes, they are legion. '''
if delim:
if delim == '|' and sorted(set(a.split('|'))) == sorted(set(b.split('|'))): # vax fields, their delimiter is the pipe symbol, also removing duplicates
return( '', '' )
elif delim != '. ' and sorted(set(a.split(' '))) == sorted(set(b.split(' '))): # ignore reversed order and repeat words
return( '', '' )
''' To remove punctuation crudely: from string import punctuation ; ' '.join(filter(None, (word.strip(punctuation) for word in a.split()))) https://stackoverflow.com/a/15740656/962391
Ok, I give up. To avoid boatloads of problems, like commas replaced with pipe symbols in SYMPTOM_TEXT, going to compare/return just bare words for long strings
Avoid affecting things like 48.0
`punctuation` is just a string ... '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
punctuations = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~' ... removing the single quote, note the 's', plural, variable name. '''
if len(a) > context and len(b) > context:
a = ' '.join(filter(None, (word.strip(''.join(punctuation)) for word in a.split()))) # consider not stripping ' .. ' instead but dicey
b = ' '.join(filter(None, (word.strip(''.join(punctuation)) for word in b.split())))
largest_string = longest_word_string(a, b)
if largest_string and (len(largest_string) > context): # and ((len(a) > length_max) or (len(b) > length_max)):
a = re.sub(re.escape(largest_string), ' .. ', a, flags=re.IGNORECASE)
b = re.sub(re.escape(largest_string), ' .. ', b, flags=re.IGNORECASE)
if a.lower() == b.lower():
return ('', '')
a_list = [x.strip() for x in a.split()] ; b_list = [x.strip() for x in b.split()]
only_a = [x for x in a_list if x not in b_list]
only_b = [x for x in b_list if x not in a_list]
a_list = [] ; b_list = []
for x in only_a:
if x in a_list:
continue
a_list.append(x)
for x in only_b:
if x in b_list:
continue
b_list.append(x)
a = ' '.join(a_list) ; b = ' '.join(b_list)
if not a or not b:
return (a, b)
else:
continue_while = 0
a = re.sub(r'\s\s+', ' ', a) ; b = re.sub(r'\s\s+', ' ', b) # multiple spaces
a = a.strip() ; b = b.strip() # front and end spaces
# remove multiples like ' cms hcc .. cms hcc .. cms hcc .. cms hcc '
if ' .. ' in a or ' .. ' in b:
a_list = a.split('..') ; b_list = b.split('..') # not a complete job on 1371552 but good enough for now
a_list = [x.strip() for x in a_list] ; b_list = [x.strip() for x in b_list]
# drop '' and ' '
a_list = [x for x in a_list if re.search(r'\w', x)] ; b_list = [x for x in b_list if re.search(r'\w', x)]
if len(a_list) != len(set(a_list)) or len(b_list) != len(set(b_list)): # if multiples
only_a = [x for x in a_list if x not in b_list]
only_b = [x for x in b_list if x not in a_list]
a_list = [] ; b_list = []
for x in only_a:
if x in a_list:
continue
a_list.append(x)
for x in only_b:
if x in b_list:
continue
b_list.append(x)
a = ' .. '.join(a_list) ; b = ' .. '.join(b_list)
a = re.sub(r'\s\s+', ' ', a) ; b = re.sub(r'\s\s+', ' ', b) # necessary anymore after additions above?
if re.sub(r'[^a-zA-Z0-9|]', '', a.lower()) == re.sub(r'[^a-zA-Z0-9|]', '', b.lower()): # again
return( '', '' ) # Ignore punctuation and upper/lower case
return ( a, b )
# Integrated from original: diff_simplify()
def diff_simplify( x ):
''' Remove trivial stuff from string, etc '''
x = re.sub(r' `\^` ' , '' , x) # the repeat sentence replacements that were entered
x = re.sub(r'[^\x00-\x7F]+', ' ' , x) # unicode
x = re.sub(r'\s\s+' , ' ' , x) # multiple spaces with single to avoid trivial
x = re.sub(r"''" , "'" , x) # doubled can mess things up like ... I''m ... in 1455992
x = x.strip() # front and end spaces
return x
# Integrated from original: get_prevalent_delimiters()
def get_prevalent_delimiters(prv_str):
''' '''
max_delim = 0
delim = ''
for d in [ '. ', '|', '; ', ' - ' ]:
if d == '|' and '_|_' in prv_str:
continue # clumsy but these get crossed and '_|_' handled elsewhere
count = prv_str.count(d)
if count > max_delim:
max_delim = count
if max_delim >= 5:
delim = d
return( max_delim, delim )
# Integrated from original: longest_word_string()
def longest_word_string(a, b):
''' https://stackoverflow.com/a/42882629/962391 '''
answer = []
list_a = a.split()
list_b = b.split()
len1, len2 = len(list_a), len(list_b)
for i in range(len1):
for j in range(len2):
lcs_temp = 0
match = []
while ((i+lcs_temp < len1) and (j+lcs_temp < len2) and list_a[i+lcs_temp].lower() == list_b[j+lcs_temp].lower()):
match.append( list_b[j+lcs_temp].lower() )
lcs_temp += 1
if len(match) > len(answer):
answer = match
answer = ' '.join(answer)
answer = answer.strip()
return answer
# Integrated from original: find_all_context()
def find_all_context(search_string, df_in, df_in_column, df_out_column):
search_string = search_string .lower()
df_in[df_in_column] = df_in[df_in_column].str.lower()
df_found = df_in.loc[ df_in[df_in_column].str.contains(search_string, na=False) ]
if len(df_found):
df_found = df_found.copy()
str_context = r'\b(.{0,20}' + search_string + r'.{0,20})\b'
df_found[df_out_column] = df_found[df_in_column].str.findall(str_context).str.join(' ~~ ')
return df_found
else:
return pd.DataFrame() # empty for len 0
# Integrated from original: do_autodownload()
def do_autodownload():
''' https://vaers.hhs.gov/data/datasets.html '''
if not autodownload:
return
sys.path.insert(0, './vaers_downloader')
# enable when the time comes # from VAERSFileDownloader import updateVAERSFiles
updateVAERSFiles(
needsUpdate = True,
years = [2020],
workingDirectory = os.getcwd()
)
# =============================================================================
# TOP-LEVEL WORKER FUNCTIONS (must be module-level for multiprocessing.Pool)
# =============================================================================
def _read_file_for_pool(filename_use_chunks):
"""Worker for parallel CSV reading."""
filename, use_chunks = filename_use_chunks
return open_file_to_df(filename, doprint=0, use_chunks=use_chunks)
def _dedupe_single_for_pool(vid_content):
"""Stateless symptom-dedup worker. Returns (vid, new_content, count, total_bytes).
vaers36 data-fidelity hardening: on any error path or empty result when input had
content, return the original content so we never silently drop SYMPTOM_TEXT etc.
(The map+results assembly in the caller also has an explicit fallback.)
"""
vid, content = vid_content
vid = int(vid)
orig_content = str(content)
content = orig_content
content = re.sub(r'^"', '', content)
content = re.sub(r'"$', '', content)
try:
delim = get_prevalent_delimiters(content)[1]
if not delim:
return vid, orig_content, 0, 0
list_in = re.split(re.escape(delim), content)
if len(list_in) == len(set(list_in)):
return vid, orig_content, 0, 0
count_of_replacements = 0
total_bytes = 0
list_out = []
for line in list_in:
if line in list_out and len(line) > 40:
count_of_replacements += 1
total_bytes += len(line)
line = '`^`'
list_out.append(line)
cleaned = delim.join(list_out)
# Never return empty string if we started with content
if not cleaned and orig_content:
return vid, orig_content, 0, 0
return vid, cleaned, count_of_replacements, total_bytes
except Exception:
# On any failure (huge text, weird delimiters after restore, regex, etc.)
# preserve the original so the final output does not lose the report text.
return vid, orig_content if orig_content else '', 0, 0
def _join_symptom_rows_chunk(df_chunk):
"""Worker: astype(str) + sort + join one DataFrame column-subset chunk into '_|_' rows.
Receiving a DataFrame (not a pre-sorted array) lets the expensive astype+sort run in
parallel across all worker processes instead of serially in the main process."""
arr = np.sort(df_chunk.values.astype(str), axis=1)
return ['_|_'.join(row) for row in arr]
def _agg_vax_chunk(chunk_df):
"""Worker: groupby-aggregate and join a VAERS_ID slice of the vax dataframe.
Joining here avoids a second single-core map pass after pd.concat."""
return chunk_df.groupby('VAERS_ID', sort=False).agg(list).map('|'.join)
def _merge_parallel(left, right, on='VAERS_ID', how='left'):
"""Split left DataFrame across NUM_CORES threads and merge each chunk with right.
Threads share memory so right is never pickled — safe for large DataFrames."""
if NUM_CORES <= 1 or len(left) < NUM_CORES * 500:
return pd.merge(left, right, on=on, how=how)
splits = np.array_split(left, NUM_CORES)
with ThreadPoolExecutor(max_workers=NUM_CORES) as ex:
chunks = list(ex.map(lambda chunk: pd.merge(chunk, right, on=on, how=how), splits))
return pd.concat(chunks, ignore_index=True)
def _groupby_syms_chunk(df_chunk):
"""Reduce symptom rows: groupby VAERS_ID and join all values with _|_ delimiter.
Runs in a worker process so df_chunk must contain only rows for a disjoint set
of VAERS_IDs (guaranteed by the caller's array_split on unique IDs)."""
return (df_chunk[['VAERS_ID', 'symptom_entries']]
.astype(str)
.groupby('VAERS_ID')
.agg(list)
.map('_|_'.join)
.reset_index())
def _serialize_csv_chunk(chunk):
"""Serialize one DataFrame chunk to a CSV string (no header, no index)."""
return chunk.to_csv(index=False, header=False)
# Module-level global for fork-based parallel CSV serialization.
# Set in write_to_csv() before creating the Pool; forked workers inherit via CoW.
_g_df_to_serialize = None
def _serialize_csv_slice(idx_pair):
"""Serialize one row-range slice of the fork-inherited global DataFrame.
Receives only a tiny (start, stop) tuple over IPC — the large DataFrame is
never pickled because forked worker processes inherit it from the parent."""
start, stop = idx_pair
return _g_df_to_serialize.iloc[start:stop].to_csv(index=False, header=False)
# Globals for parallel astype(str)+fillna and element-wise row comparison in compare().
_g_df_pair: list = [] # [df_prv, df_new] — set before fork, inherited by workers
_g_pb_cmp = None # prv DataFrame slice for row comparison
_g_nb_cmp = None # new DataFrame slice for row comparison
def _astype_str_fillna_pair_chunk(args):
"""Convert a column-subset of one of two fork-inherited DataFrames to str+fillna.
args = (df_index: 0 or 1, col_chunk: list[str])"""
df_idx, col_chunk = args
return _g_df_pair[df_idx][col_chunk].astype(str).fillna('')
def _compare_rows_col_chunk(col_chunk):
"""Return a boolean numpy array: True where every column in chunk is equal row-wise."""
return (_g_pb_cmp[col_chunk] == _g_nb_cmp[col_chunk]).all(axis=1).to_numpy()
def _pool_astype_str(df0, df1, cols1=None):
"""Convert two DataFrames to str+fillna using a forked Pool (true multi-core).
Workers inherit both DataFrames via CoW; only small column-subset results are
pickled back. cols1: restrict df1 to these columns (e.g. ['VAERS_ID', 'symptom_entries']).
Returns (df0_str, df1_str)."""
global _g_df_pair
_g_df_pair = [df0, df1]
c0 = df0.columns.tolist()
c1 = cols1 if cols1 is not None else df1.columns.tolist()
n0 = min(NUM_CORES, len(c0))
n1 = min(NUM_CORES, len(c1))
tasks = [(0, list(ch)) for ch in np.array_split(c0, n0)] + \
[(1, list(ch)) for ch in np.array_split(c1, n1)]
with Pool(min(NUM_CORES, len(tasks))) as pool:
res = pool.map(_astype_str_fillna_pair_chunk, tasks)
_g_df_pair = []
return pd.concat(res[:n0], axis=1), pd.concat(res[n0:], axis=1)
# Module-level globals used by _build_col_arg_global.
# Set in compare() before forking the build-pool; forked workers inherit via CoW.
_g_prv_arrs: dict = {}
_g_new_arrs: dict = {}
_g_ids_np_global = None
_g_col_date = None
_g_col_test = None
def _build_col_arg_global(col):
"""Extract changed rows for one column using fork-inherited globals (no IPC pickling).
Returns (col, ids_np, prv_arr, new_arr, date, test) — all numpy arrays."""
prv_arr = _g_prv_arrs[col]
new_arr = _g_new_arrs[col]
ids_np = _g_ids_np_global
both_nan = pd.isnull(prv_arr) & pd.isnull(new_arr)
chg_mask = (prv_arr != new_arr) & ~both_nan
ids_chg = ids_np[chg_mask]
return (col, ids_chg, prv_arr[chg_mask], new_arr[chg_mask], _g_col_date, _g_col_test)
def _compare_col_worker(args):
"""
Process one column's changes.
Returns (col, row_updates, bulk_blank, col_stats, print_lines)
row_updates : list of (vid, edits_add, changes_note, col_val_or_None)
bulk_blank : None | {'vids': [...], 'prv_vals': {vid: val}, 'cells_emptied': int,
'message': str, 'col': str, 'date': str}
col_stats : {'cells_edited': int, 'cells_emptied': int,
'trivial_changes_ignored': int, 'col_count': int}
print_lines : list of strings
"""
(col, ids_np, prv_arr, new_arr, date_currently, use_test_cases_flag) = args
row_updates = []
bulk_blank = None
col_stats = {'cells_edited': 0, 'cells_emptied': 0,
'trivial_changes_ignored': 0, 'col_count': 0}
print_lines = []
if use_test_cases_flag and col == 'FORM_VERS':
return col, row_updates, bulk_blank, col_stats, print_lines
# Rebuild minimal DataFrames for this column from pre-filtered numpy arrays
col_prv = col + '_prv'
col_new = col + '_new'
df_slice_prv = pd.DataFrame({'VAERS_ID': ids_np, col: prv_arr}).sort_values('VAERS_ID').reset_index(drop=True)
df_slice_new = pd.DataFrame({'VAERS_ID': ids_np, col: new_arr}).sort_values('VAERS_ID').reset_index(drop=True)
vids_changed_this_col = df_slice_new.loc[
df_slice_new[col].ne(df_slice_prv[col])
].VAERS_ID.to_list()
if not vids_changed_this_col:
return col, row_updates, bulk_blank, col_stats, print_lines
df_three_columns = pd.merge(
df_slice_prv.loc[df_slice_prv.VAERS_ID.isin(vids_changed_this_col)],
df_slice_new.loc[df_slice_new.VAERS_ID.isin(vids_changed_this_col)],
on='VAERS_ID', suffixes=('_prv', '_new')
)
# Bulk-blanking shortcut (200+ rows cleared)
df_3_made_blank = df_three_columns.loc[
df_three_columns[col_prv].ne('') & df_three_columns[col_new].eq('')
]
if len(df_3_made_blank) >= 200:
vids_list = df_3_made_blank.VAERS_ID.to_list()
prv_vals = dict(zip(df_3_made_blank.VAERS_ID, df_3_made_blank[col_prv]))
total_prv_bytes = df_3_made_blank[col_prv].astype(str).map(len).sum()
message = (f"{len(df_3_made_blank)} rows with {total_prv_bytes} total bytes MADE BLANK")
col_padded = (' ' * (20 - len(col))) + col
print_lines.append(f' {len(df_3_made_blank):>10} {col} blanked out, noting in bulk, omitting their VAERS_IDs')
print_lines.append(f'{"":>10} {col_padded} {"":>8} {message:>70} <> [] ')
col_stats['cells_emptied'] += len(vids_list)
bulk_blank = {'vids': vids_list, 'prv_vals': prv_vals,
'cells_emptied': len(vids_list),
'message': message, 'col': col, 'date': date_currently}
df_three_columns = df_three_columns.loc[~df_three_columns.VAERS_ID.isin(vids_list)]
if not len(df_three_columns):
return col, row_updates, bulk_blank, col_stats, print_lines
# Trivial-change filter
len_before = len(df_three_columns)
if 'DATE' in col:
df_three_columns = df_three_columns.loc[
df_three_columns[col_prv].astype(str).str.replace(
r'^(?:0*\d\/\d\d|\d\d\/0*\d|0*\d\/0*\d)\/\d{4}', '', regex=True)
!= df_three_columns[col_new].astype(str).str.replace(
r'^(?:0*\d\/\d\d|\d\d\/0*\d|0*\d\/0*\d)\/\d{4}', '', regex=True)
]
else:
df_three_columns = df_three_columns.loc[
df_three_columns[col_prv].astype(str).str.replace(r'[\W_]', '', regex=True)
!= df_three_columns[col_new].astype(str).str.replace(r'[\W_]', '', regex=True)
]
count_trivial = len_before - len(df_three_columns)
if count_trivial:
cellword = 'cell' if count_trivial == 1 else 'cells'
col_padded = (' ' * (20 - len(col))) + col
print_lines.append(f'{col_padded} {count_trivial:>7} {count_trivial} {cellword} of trivial non-letter differences ignored')
col_stats['trivial_changes_ignored'] += count_trivial
if not len(df_three_columns):
return col, row_updates, bulk_blank, col_stats, print_lines
df_three_columns = df_three_columns.fillna('')
try:
df_uniq = df_three_columns.groupby([col_prv, col_new])[
['VAERS_ID', col_prv, col_new]
].transform(lambda x: ' '.join(x.astype(str).unique()))
except Exception:
return col, row_updates, bulk_blank, col_stats, print_lines
df_uniq = df_uniq.drop_duplicates(subset=df_uniq.columns).copy()
df_uniq['VAERS_IDs'] = df_uniq.VAERS_ID.str.split(' ')
df_uniq = df_uniq[['VAERS_IDs', col_prv, col_new]]
df_uniq['len'] = df_uniq[col_new].apply(len)
df_uniq = df_uniq.sort_values(by=list(set(df_uniq.columns) - {'VAERS_IDs'}))
blank_brackets = '[]'
col_padded = (' ' * (20 - len(col))) + col
for _, row in df_uniq.iterrows():
vids_list = sorted([int(x) for x in row['VAERS_IDs']])
vid = vids_list[0]
val_prv = row[col_prv]
val_new = row[col_new]
val_prv_ori = val_prv
val_new_ori = val_new
newly_cut = 1 if val_prv and (not val_new) and ('cut_' not in val_prv) else 0
continuing_cut = val_prv if 'cut_' in val_prv and not val_new else ''
restored = ''
if 'cut_' in val_prv and val_new and val_prv.startswith(val_new):
restored = ' [restored]'
if newly_cut:
col_stats['cells_emptied'] += len(vids_list)
if col in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']:
val_new = re.sub(r'\.0$', '', str(val_new))
val_prv = re.sub(r'\.0$', '', str(val_prv))
if val_prv and val_new:
if re.sub(r'\W', '', val_prv) == re.sub(r'\W', '', val_new):
continue
val_prv, val_new = diff_context(val_prv, val_new, col, vid, date_currently)
elif not val_prv and not val_new:
continue
if not continuing_cut:
col_stats['cells_edited'] += len(vids_list)
col_stats['col_count'] += len(vids_list)
vids_len_to_print = len(vids_list) if len(vids_list) > 1 else ''
vids_list_to_print = [int(x) for x in vids_list] if len(vids_list) > 1 else ''
vid_to_print = vid if len(vids_list) == 1 else ''
val_prv_print = val_prv
val_new_print = val_new
try:
if ' cut_' in val_prv_ori:
if continuing_cut:
val_prv_print = val_prv_ori
elif restored:
val_prv_print = "''"
val_prv_print = re.sub(r' cut_.*$', '', val_prv_print)
if not val_new_print and len(val_prv_print) > 100:
prv_excerpt = re.sub(r'^(.{1,40}.*?)\b.*', r'\1', val_prv_print) + f' ... ~{len(val_prv_print) - 40} more'
if not continuing_cut:
print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print:>8} {prv_excerpt:>70} <> {val_new_print}{restored} {vids_list_to_print}')
else:
if not continuing_cut:
print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print:>8} {val_prv_print:>70} <> {val_new_print}{restored} {vids_list_to_print}')
except Exception:
val_prv_print = re.sub(r'[^\x00-\x7F]+', '_', str(val_prv_print))
val_new_print = re.sub(r'[^\x00-\x7F]+', '_', str(val_new_print))
print_lines.append(f'{vids_len_to_print:>10} {col_padded} {vid_to_print} {val_prv_print:>70} <> {val_new_print}{restored} {vids_list_to_print}')
for vid in vids_list:
edits_add = 0
val_to_keep = val_prv if (len(val_prv) and not len(val_new_ori)) else val_new_ori
the_changes_note = ''
if newly_cut:
edits_add = 1
val_to_keep = f'{val_to_keep} cut_{date_currently} '
the_changes_note = f'{col} cut_{date_currently} '
elif continuing_cut:
val_to_keep = val_prv_ori
else:
if val_new and 'cut_' in val_prv_ori:
val_prv_c = re.sub(r' cut_.*$', '', val_prv)
if val_prv_c == val_new:
val_prv = "''"
if not val_prv and not val_new:
edits_add = 0
the_changes_note = ''
else:
if not val_prv: val_prv = '[]'
if not val_new: val_new = '[]'
edits_add = 1
the_changes_note = f'{col} {date_currently}: {val_prv} <> {val_new} '
row_updates.append((vid, edits_add, the_changes_note,
val_to_keep if val_to_keep else None))
return col, row_updates, bulk_blank, col_stats, print_lines
# Integrated from original: symptoms_dedupe_repeat_sentences()
def symptoms_dedupe_repeat_sentences(_data):
''' Remove repeat sentences using all CPU cores '''
print(f' Repeat sentence removal in SYMPTOM_TEXT ({NUM_CORES} cores)')
_data = _data.copy()
if len(_data):
pairs = list(zip(_data['VAERS_ID'].tolist(), _data['SYMPTOM_TEXT'].tolist()))
chunksize = max(1, len(pairs) // (NUM_CORES * 4))
with Pool(NUM_CORES) as pool:
if SHOW_PROGRESS:
results = list(tqdm(
pool.imap(_dedupe_single_for_pool, pairs, chunksize=chunksize),
total=len(pairs), desc=" Deduping SYMPTOM_TEXT", unit="report", leave=True
))
else:
results = pool.map(_dedupe_single_for_pool, pairs, chunksize=chunksize)
content_map = {}
for vid, new_content, count_replacements, total_bytes in results:
content_map[vid] = new_content
if count_replacements and vid not in writeups_deduped:
stats['dedupe_bytes'] += total_bytes
stats['dedupe_reports'] += 1
stats['dedupe_count'] += count_replacements
writeups_deduped[vid] = 1
if total_bytes > stats['dedupe_max_bytes']:
stats['dedupe_max_bytes'] = total_bytes
stats['dedupe_max_vid'] = vid
# Data-fidelity safety (vaers36 fix for vaers35 bug):
# The parallel Pool+imap+map assembly for _dedupe_single_for_pool could
# (for very long texts, certain restored reports, or edge delim cases)
# result in missing keys or '' in content_map for VIDs that had content.
# This caused SYMPTOM_TEXT (and potentially other long fields after dedupe)
# to become blank in the FLATTENED / subsequent FLATFILE / FINAL_MERGED
# for many high cell_edits / deleted+restored reports — exactly the
# symptom vs. the reference orig.py outputs.
# Guard: fall back to the pre-dedupe text for any VID that would go blank.
pre_sym = _data['SYMPTOM_TEXT'].copy()
_data['SYMPTOM_TEXT'] = _data['VAERS_ID'].map(content_map)
blank_after = _data['SYMPTOM_TEXT'].isna() | (_data['SYMPTOM_TEXT'] == '')
had_content = pre_sym.notna() & (pre_sym != '')
restore_mask = blank_after & had_content
if restore_mask.any():
_data.loc[restore_mask, 'SYMPTOM_TEXT'] = pre_sym[restore_mask]
print(f" [vaers36 data-fidelity] Restored SYMPTOM_TEXT for {int(restore_mask.sum())} VIDs that the parallel dedupe map would have blanked.")
_data['SYMPTOM_TEXT'] = _data['SYMPTOM_TEXT'].fillna('')
print()
to_print = f"{stats['dedupe_count']:>10} SYMPTOM_TEXT field repeat sentences deduped in "
to_print += f"{stats['dedupe_reports']} reports, max difference {stats['dedupe_max_bytes']} bytes in VAERS_ID {stats['dedupe_max_vid']}"
print(to_print) ; print()
return _data
# Integrated from original: symptoms_dedupe_repeat_sentences_each()
def symptoms_dedupe_repeat_sentences_each(series_vid_sym):
''' Remove repeat sentences in VAERS SYMPTOM_TEXT fields
Prints each time a larger change occurs.
Originated at https://stackoverflow.com/a/40353780/962391 '''
# Use delim_count, delim = get_prevalent_delimiters(prv)
# In 2025 ... :1818: FutureWarning: Series.__getitem__ treating keys as positions is deprecated.
# In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior).
# To access a value by position, use `ser.iloc[pos]`
vid = int(series_vid_sym.iloc[0]) # 2025 force int, is this a mistake? TODO.
content = str(series_vid_sym.iloc[1]) # sometimes not string
''' Some of the input files filled in by various people (at CDC) evidently might have gone through some processing in Excel by them.
As a result, fields have double quotes injected, they have to be removed or big trouble, takes time but required. '''
content = re.sub(r'^"', '', content) # not certain about these
content = re.sub(r'"$', '', content)
delim = get_prevalent_delimiters(content)[1]
if not delim:
return content
list_in = re.split(re.escape(delim), content)
unique_set = set(list_in)
dupe_count = len(list_in) - len(unique_set)
if not dupe_count:
return content # no dupes
count_of_replacements = 0
total_bytes = 0
list_out = []
for line in list_in:
#if '`^`' in line: # previously processed by mistake?
# continue
if line in list_out and (len(line) > 40): # only dedupe longish lines
#try:
# print(f' {line}')
#except Exception as e:
# pass # UnicodeEncodeError: 'charmap' codec can't encode character '\ufffd' in position 4: character maps to <undefined>
count_of_replacements += 1
total_bytes += len(line)
if vid not in writeups_deduped:
stats['dedupe_bytes'] += len(line)
line = '`^`'
list_out.append(line)
if total_bytes > stats['dedupe_max_bytes']:
stats['dedupe_max_bytes'] = total_bytes
stats['dedupe_max_vid' ] = vid
content = delim.join(list_out)
# stats
if vid not in writeups_deduped: # count each report only once, first time seen
stats['dedupe_reports'] += 1 # count of reports that ever had any repeat sentences
writeups_deduped[vid] = 1 # for unique vid tracking
#count_of_replacements = content.count('`^`')
if count_of_replacements:
stats['dedupe_count'] += count_of_replacements
return content # modified
# - - - - - - - - - - - - - - -
# BELOW SEEMS TO HAVE FLAWS
# string_split_multiple_delims = re.split('[:;,\|]\s*', content) # doesn't preserve them
list_in = re.split(r'[^a-zA-Z0-9\'\s]', content) # split on everything except alphanumeric, space etc
list_in = [x for x in list_in if len(x) >= 40] # just the longer strings
dupes_list = [x for x, count in Counter(list_in).items() if count > 1]
if not dupes_list:
return content
# Doing dedupe counts only once per VAERS_ID
# Replace all but the first with `^` , low tech solution: https://stackoverflow.com/a/53239962/962391
total_bytes = 0
for dup in dupes_list:
content = content.replace(re.escape(dup), ' `^` ').replace(' `^` ', dup, 1)
total_bytes += len(dup)
if vid not in writeups_deduped:
stats['dedupe_bytes'] += len(dup)
if total_bytes > stats['dedupe_max_bytes']:
stats['dedupe_max_bytes'] = total_bytes
stats['dedupe_max_vid' ] = vid
# stats
if vid not in writeups_deduped: # count each report only once, first time seen
stats['dedupe_reports'] += 1 # count of reports that ever had any repeat sentences
writeups_deduped[vid] = 1 # for unique vid tracking
count_of_replacements = content.count(' `^` ')
if count_of_replacements:
stats['dedupe_count'] += count_of_replacements
return content # modified
# Integrated from original: get_next_date()
def get_next_date():
''' Next date has to be the next flattened needed to be done. Compare uses flattened files. '''
# TODO: This has changed, there's surely some dead code here now that would never be hit, to be removed. Also find ... cheap trick, this is part of it. Slopppy ...
dates_input = sorted( [x for x in files['input' ]['date'] if x not in files['done']]) # added 'done' in 2025
dates_changes = sorted(files['changes']['date'])
if not files['flattened']['keyval']: # first run
return dates_input[0]
# Comparing to inputs (to do)
if dates_changes:
high_changes_done = dates_changes[-1]
changes_next_candidates = [x for x in dates_input if x > high_changes_done]
if changes_next_candidates:
date_next = changes_next_candidates[0]
if date_ceiling and ( date_next > date_ceiling ):
print(f'\n\n\n\n\t\t\t date_ceiling is set and was reached: date_next {date_next} > {date_ceiling} \n\n\n\n')
return None
return date_next
else:
''' Save final output as two, split in half, due to Excel row limit of about 1.048 million per sheet. '''
# dead no? if len(df_flat_prv):
# dead no? print() ; print(' save_multi_csv(high_changes_done, df_flat_prv)') ; print()
# dead no? save_multi_csv(high_changes_done, df_flat_prv)
if date_ceiling:
print(f'\n\n\n\n\t\t\t date_ceiling is {date_ceiling}, highest changes: {high_changes_done} \n\n\n\n')
else:
print('\n No more input files to process')
return None
else:
''' Confusions. Whole thing needs a lot of conditions applied.
If there are no changes files, can still be consolidated and flattened.
If no changes, just return the first as next? No, that's a loop.
'''
if files['flattened']['date']:
if dates_input[0] in files['flattened']['date']:
# Initiating, copy it
date_currently = dates_input[0]
print_date_banner(date_currently)
file_for_copy_original = dir_compared + '/' + date_currently + '_VAERS_FLATFILE.csv'
print(
f' Due to first drop, creating from flattened: {file_for_copy_original}')
print()
file_flattened = files['flattened']['keyval'][date_currently]
shutil.copy(file_flattened, file_for_copy_original) # copyfile() doesn't include permissions so it can result in problems
# The file has to be read for VAERS_IDs for ever_covid to be accurate
print() ; print(f' Reading original for ever_covid bookkeeping')
df = open_file_to_df(file_flattened, doprint=1)
# Initial row in stats file when only one compare/changes file
stats_initialize (date_currently) # per drop, then a totals row is calculated
do_never_ever( df.VAERS_ID.to_list(), date_currently, 'get_next_date on file_flattened' )
do_ever_covid( sorted(set(df.VAERS_ID.to_list())) )
stats_resolve(date_currently)
return dates_input[1] # NEXT ONE
return dates_input[0]
consolidated_not_done = sorted( set(files['input']['date']) - set(files['consolidated']['date']) )
if consolidated_not_done:
date_todo = sorted(consolidated_not_done)[0]
print(f' Consolidation to do: {date_todo}')
return date_todo
flattened_not_done = sorted( set(files['input']['date']) - set(files['flattened']['date']) )
if flattened_not_done:
date_todo = sorted(flattened_not_done)[0]
print(f' Flattening to do: {date_todo}')
return date_todo
latest_flattened_done = sorted(files['flattened']['date'])[-1] if files['flattened']['date'] else ''
dates_list = [x for x in dates_input if x > latest_flattened_done]
date_next = dates_list[0]
if len(dates_list) == 0: # No more to process
print()
exit(' No more files to process')
elif date_ceiling and ( date_next > date_ceiling ):
exit(f'\n\n\n\n\t\t\t date_ceiling is set and was reached: date_next {date_next} > {date_ceiling} \n\n\n\n')
else:
return date_next
# Integrated from original: more_to_do()
def more_to_do():
''' Return 1 or 0 on whether any more input files process '''
global elapsed_drop, df_flat_1
files_populate_information()
if not files['changes']['date']: # None done yet
return 1
if files['input']['date'] == files['flattened']['date'] == files['consolidated']['date'] == files['changes']['date']:
if files['changes']['date'][-1] >= files['input']['date'][-1]:
date_currently = files['changes']['date'][-1]
if df_flat_1 is not None and len(df_flat_1):
pass
#save_multi_csv(date_currently, df_flat_1) # more than one csv due to Excel row limit
else:
prv_date = files['changes']['date'][-1]
filename = files['changes']['keyval'][prv_date]
print(f'Patching for creation of split changes file _A and _B, no more to do case unusual, using {filename}')
df_flat_1 = open_file_to_df(filename, doprint=0)
print() ; print(' save_multi_csv(date_currently, df_flat_1)') ; print()
save_multi_csv(date_currently, df_flat_1) # more than one csv due to Excel row limit
print()
print(f" No more to do, last set {files['changes']['date'][-1]} >= {files['input']['date'][-1]} done")
return 0
elapsed_drop = _time.time() # new start marker for total time on each drop
return 1
# Integrated from original: print_date_banner()
def print_date_banner(this_drop_date, file_num=None, total_input_files=None):
_INV = '\033[7m' # reverse-video (invert fg/bg)
_RST = '\033[0m' # reset
progress_str = ''
if file_num is not None and total_input_files is not None:
files_left = total_input_files - file_num
pct = 100 * file_num / total_input_files if total_input_files else 0
progress_str = (
f' {_INV} File {file_num}/{total_input_files} '
f'— {files_left} left ({pct:.0f}%) {_RST}'
)
print()
print('= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = ')
print(f' Next date {this_drop_date}{progress_str}')
print('= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = ')
print()
# Integrated from original: consolidate()
def consolidate(files_date_marker):
'''
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Consolidate -- All in one file but still multiple rows per VAERS_ID sometimes.
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = '''
global df_vax, df_data, df_syms_flat, ever_covid, ever_any, covid_earliest_vaers_id
print() ; print(f' Consolidation')
files_populate_information()
''' Consolidate only if the file doesn't already exist '''
if files_date_marker in files['consolidated']['date']:
if files_date_marker in files['consolidated']['keyval']: # already done, use it
print(f" Consolidation already done: {files['consolidated']['keyval'][files_date_marker]}")
df_vax = pd.DataFrame() # avoid picking up a previous consolidation when flattening
return
else:
print(f" ERROR: Expected {files_date_marker} in files['consolidated']['keyval']")
if files_date_marker in files['flattened']['date']:
print(f' Skipping consolidation because flattened for {files_date_marker} already exists')
return
''' Combine all data, vax and symptoms files as one
(if not already a file like 2020-12-25_VAERS_CONSOLIDATED.csv in which case just do the treament) '''
print(f' Reading VAERS files across {NUM_CORES} cores: *VAERSDATA.csv *VAERSVAX.csv *VAERSSYMPTOMS.csv')
vax_glob = glob.glob(dir_working + '/' + '*VAERSVAX.csv')
syms_glob = glob.glob(dir_working + '/' + '*VAERSSYMPTOMS.csv')
data_glob = glob.glob(dir_working + '/' + '*VAERSDATA.csv')
# Read all three file groups with a single Pool so every core is busy
all_input_files = vax_glob + syms_glob + data_glob
_workers = min(NUM_CORES, max(1, len(all_input_files)))
_args = [(f, False) for f in all_input_files]
with Pool(_workers) as pool:
if SHOW_PROGRESS:
_all_dfs = list(tqdm(
pool.imap(_read_file_for_pool, _args),
total=len(all_input_files), desc=" Reading VAERS files", unit="file", leave=True
))
else:
_all_dfs = pool.map(_read_file_for_pool, _args)
_vax_set = set(vax_glob); _syms_set = set(syms_glob); _data_set = set(data_glob)
def _concat_group(flist):
dfs = [df for f, df in zip(all_input_files, _all_dfs)
if f in flist and df is not None and len(df) > 0]
return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
df_vax = _concat_group(vax_glob)
df_syms = _concat_group(syms_glob)
df_data_all = _concat_group(data_glob)
''' Remove all reports prior to the first covid jab. [That was the thinking but now in 2025 with all CDC drops (at last), widen to all vaccines?]
2020-12-14 902418 First report after public rollout is next day: https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=902418&WAYBACKHISTORY=ON
2020-10-02 896636 may have been from a trial: https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=896636&WAYBACKHISTORY=ON
VAERS ID: 896636
VAERS Form: 2
Age: 47.0
Sex: Female
Location: South Carolina
Vaccinated: 2020-09-28
Onset: 2020-10-02
Submitted: 0000-00-00
Entered: 2020-11-14
Vaccination / Manufacturer (1 vaccine) Lot / Dose Site / Route
COVID19: COVID19 (COVID19 (MODERNA)) / MODERNA - / UNK LA / SYR
2020-12-10 970043 Died same day, reported later: https://www.medalerts.org/vaersdb/findfield.php?IDNUMBER=970043&WAYBACKHISTORY=ON
'''
# Automatically detect earliest covid VAERS_ID. TODO: Do earlier in the code?
df_vax_all_covid = df_vax.loc[ df_vax[ 'VAX_TYPE' ].str.lower().str.contains('covid', na=False) ]
if len(df_vax_all_covid):
covid_earliest_vaers_id_now = df_vax_all_covid.VAERS_ID.astype(int).min()
if covid_earliest_vaers_id == 0: # no previous VAERS_ID value
covid_earliest_vaers_id = covid_earliest_vaers_id_now
print(f' Earliest covid VAERS_ID initiated at {covid_earliest_vaers_id}\n')
elif covid_earliest_vaers_id > covid_earliest_vaers_id_now:
# was previously set but is even lower now (altho published later)
print(f' Earliest covid VAERS_ID set EVEN LOWER now at {covid_earliest_vaers_id}, was {covid_earliest_vaers_id}\n')
covid_earliest_vaers_id = covid_earliest_vaers_id_now
else:
print(f' No covid found in {files_date_marker}\n')
# Cheap trick to avoid running the same input again, and this should be improved. Quite the logic puzzle. Trimming files_date_marker from inputs list.
files[ 'input' ][ 'date' ] = [x for x in files['input' ]['date'] if x != files_date_marker]
return
stats['lo_ever'] = covid_earliest_vaers_id # TODO: 2025, wrong? right?
if vids_limit: # in testing special case
df_data_all = df_data_all.loc[ df_data_all.VAERS_ID.isin(vids_limit) ]
df_vax = df_vax .loc[ df_vax .VAERS_ID.isin(vids_limit) ]
df_syms = df_syms .loc[ df_syms .VAERS_ID.isin(vids_limit) ]
len_before = len(df_data_all)
df_data_all = df_data_all.loc[ df_data_all.VAERS_ID >= covid_earliest_vaers_id ]
df_vax = df_vax .loc[ df_vax .VAERS_ID >= covid_earliest_vaers_id ]
df_syms = df_syms .loc[ df_syms .VAERS_ID >= covid_earliest_vaers_id ]
print() ; print(f'{len_before - len(df_data_all):>10} records removed prior to the first covid report (covid_earliest_vaers_id {covid_earliest_vaers_id})')
print(f'{len(df_data_all):>10} covid vax reports to work with (unique VAERS_IDs)') ; print()
vids_list = df_data_all.VAERS_ID.to_list()
lo_data_all = min(vids_list)
hi_data_all = max(vids_list)
diff_data_all = hi_data_all - lo_data_all
len_data_all = len(vids_list)
missing = diff_data_all - len_data_all
print(f'{missing:>10} missing (any/all vax never published in covid era) is implied by only {len_data_all} present in {diff_data_all} range with lo {lo_data_all} and hi {hi_data_all}')
print()
vids_new = [ x for x in vids_list if x not in ever_any ]
print(f'{len(vids_new):>10} being added to `ever_any` for this drop, any/all vax')
ever_any.update({x:1 for x in vids_list})
#ever_any = {**ever_any, **{x:1 for x in vids_list}}
#ever_any = {x:1 for x in set(ever_any.keys() + vids_list)}
# Grab all VAERS_IDs before filtering for covid, to be able to identify gaps properly
do_never_ever( vids_list, files_date_marker, 'consolidate on df_data_all' )
stats['drop_input_covid'] = len(df_data_all)
''' VAERS_IDs can have multiple records/doses/lots in each report but at least one covid19. '''
dfv = df_vax.copy()
dfv['doses'] = dfv.VAERS_ID.map(dfv.VAERS_ID.value_counts())
dfv_single_doses = dfv.loc[ dfv.doses.eq(1) ]
dfv_singles_w_covid = dfv_single_doses.loc[ dfv_single_doses.VAERS_ID.isin(dfv_single_doses.loc[ dfv_single_doses.VAX_TYPE.str.contains('COVID', na=False) ].VAERS_ID.to_list()) ].sort_values(by='VAERS_ID')
dfv_multpl_doses = dfv.loc[ dfv.doses.ge(2) ]
dfv_multiples_w_covid = dfv_multpl_doses.loc[ dfv_multpl_doses.VAERS_ID.isin(dfv_multpl_doses.loc[ dfv_multpl_doses.VAX_TYPE.str.contains('COVID', na=False) ].VAERS_ID.to_list()) ].sort_values(by='VAERS_ID')
del dfv
dfv_covid_type_both = pd.concat([dfv_singles_w_covid.reset_index(drop=True), dfv_multiples_w_covid.reset_index(drop=True) ], ignore_index=True)
dfv_covid_type_both = dfv_covid_type_both.drop_duplicates()
df_data_other_for_covid_search = df_data_all.loc[ ~df_data_all.VAERS_ID.isin(dfv_covid_type_both.VAERS_ID) ]
df_data_covid_other_found1 = df_data_other_for_covid_search.loc[ df_data_other_for_covid_search.SYMPTOM_TEXT.str.contains(r'Pfizer|Moderna|Janssen', re.IGNORECASE, na=False) ]
df_data_covid_other_found2 = df_data_covid_other_found1 .loc[ df_data_covid_other_found1 .SYMPTOM_TEXT.str.contains(r'Covid', re.IGNORECASE, na=False) ]
print() ; print(f'{len(df_data_covid_other_found2):>10} additional reports captured with Pfizer|Moderna|Janssen and Covid in SYMPTOM_TEXT although not officially type COVID') ; print()
vids_vax__covid_type_both = dfv_covid_type_both .VAERS_ID.to_list()
vids_data_covid_other_found = df_data_covid_other_found2.VAERS_ID.to_list()
vids_all_covid_list = sorted( set(vids_vax__covid_type_both + vids_data_covid_other_found) )
# Reducing to only covid reports
len_data_all_prev_any = len(df_data_all)
df_data = df_data_all.loc[ df_data_all.VAERS_ID.isin(vids_all_covid_list) ]
df_vax = df_vax .loc[ df_vax .VAERS_ID.isin(vids_all_covid_list) ]
df_syms = df_syms .loc[ df_syms .VAERS_ID.isin(vids_all_covid_list) ]
print(f'{ len_data_all_prev_any:>10} total any vax reports') ; print()
print(f'{len_data_all_prev_any - len(df_data):>10} non-covid vax reports excluded') ; print()
stats['drop_input_covid'] = len(df_data)
#do_ever_covid( vids_all_covid_list ) # Only covid reports, subset of ever_any which is all vax
len_before = len(df_data)
df_data = df_data.drop_duplicates(subset='VAERS_ID')
if len(df_data) - len_before:
print(f'{(len_before - len(df_data)):>10} duplicates dropped in df_data on VAERS_IDs')
print(f'{len(set(df_data.VAERS_ID.to_list())):>10} covid reports to work with') ; print()
''' Shorten some fields in VAX (the change to title case also indicates I've touched it) '''
print(' Shortening some field values in VAX_NAME, VAX_MANU') ; print()
'''
VAX_TYPE
COVID19
COVID19-2
FLU4
VAX_NAME Shorter
COVID19 (COVID19 (MODERNA)) C19 Moderna
COVID19 (COVID19 (MODERNA BIVALENT)) C19 Moderna BIVALENT
COVID19 (COVID19 (PFIZER-BIONTECH)) C19 Pfizer-BionT
COVID19 (COVID19 (PFIZER-BIONTECH BIVALENT)) C19 Pfizer-BionT BIVALENT
COVID19 (COVID19 (JANSSEN))
COVID19 (COVID19 (UNKNOWN))
COVID19 (COVID19 (NOVAVAX))
INFLUENZA (SEASONAL) (FLULAVAL QUADRIVALENT)
Transitional with parens removed first step
COVID19 COVID19 MODERNA
COVID19 COVID19 MODERNA BIVALENT
COVID19 COVID19 PFIZER-BIONTECH
COVID19 COVID19 PFIZER-BIONTECH BIVALENT
COVID19 COVID19 JANSSEN
INFLUENZA SEASONAL FLULAVAL QUADRIVALENT '''
df_vax = df_vax.copy()
# VAX_NAME and VAX_MANU replacement chains are independent — run them in two threads.
def _clean_vax_name(s):
s = s.str.replace(r'[\)\(]' , r'' , regex=True)
s = s.str.replace(r'(?:COVID(\S+)\s)+' , r'C\1 ' , regex=True)
s = s.str.replace(r'PFIZER.BION.*' , r'Pfizer-BionT' , regex=True)
s = s.str.replace(r'MODERNA' , r'Moderna' , regex=True)
s = s.str.replace(r'JANSSEN' , r'Janssen' , regex=True)
s = s.str.replace(r'INFLUENZA' , r'Flu' , regex=True)
s = s.str.replace(r'VACCINE NOT SPECIFIED' , r'Not Specified', regex=True)
return s
def _clean_vax_manu(s):
s = s.str.replace(r'UNKNOWN MANUFACTURER' , r'Unknown' , regex=True)
s = s.str.replace(r'PFIZER.BION.*' , r'Pfizer-BionT' , regex=True)
s = s.str.replace(r'MODERNA' , r'Moderna' , regex=True)
s = s.str.replace(r'JANSSEN' , r'Janssen' , regex=True)
s = s.str.replace(r'PF.*WYETH' , r'Pfizer-Wyeth' , regex=True)
s = s.str.replace(r'NOVARTIS.*' , r'Novartis' , regex=True)
s = s.str.replace(r'SANOFI.*' , r'Sanofi' , regex=True)
s = s.str.replace(r'MERCK.*' , r'Merck' , regex=True)
s = s.str.replace(r'PROTEIN.*' , r'Protein' , regex=True)
s = s.str.replace(r'GLAXO.*' , r'Glaxo' , regex=True)
s = s.str.replace(r'SEQIRUS.*' , r'Seqirus' , regex=True)
s = s.str.replace(r'BERNA.*' , r'Berna' , regex=True)
return s
with ThreadPoolExecutor(max_workers=2) as ex:
_fut_name = ex.submit(_clean_vax_name, df_vax['VAX_NAME'].copy())
_fut_manu = ex.submit(_clean_vax_manu, df_vax['VAX_MANU'].copy())
df_vax['VAX_NAME'] = _fut_name.result()
df_vax['VAX_MANU'] = _fut_manu.result()
# To avoid jumbling in the case of tiny differences between drops, apparently this is necessary why?
df_vax = df_vax.sort_values(by=['VAERS_ID', 'VAX_LOT', 'VAX_SITE', 'VAX_DOSE_SERIES', 'VAX_TYPE', 'VAX_MANU', 'VAX_ROUTE', 'VAX_NAME'])
print(f' Merging DATA into VAX ({NUM_CORES} cores)')
_vax_s, _data_s = _pool_astype_str(df_vax, df_data)
df_data_vax = _merge_parallel(_vax_s, _data_s)
del _vax_s, _data_s
if nan_alert(df_data_vax):
pause=1 # for a breakpoint sometimes
df_data_vax = df_data_vax.fillna('')
print(f'{len(df_data_vax):>10} rows in df_data_vax (can be more than one row per VAERS_ID)')
'''
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
symptom_entries
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
'''
''' Aggregate symptoms into new column called symptom_entries, and merge for all symptoms covered as a single string '''
print(' Aggregating symptoms into symptom_entries string, new column')
df_syms_flat = symptoms_file_entries_append_to_symptom_text(df_syms)
'''
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Consolidation
Save result into one file like ... 2020-12-25_VAERS_CONSOLIDATED.csv
More than one row per VAERS_ID for the various doses/lots, thus duplicates to symptom_entries etc.
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
'''
''' symptom_entries to consolidated '''
print(f' Merging symptom_entries into df_data_vax ({NUM_CORES} cores)')
_dvx_s, _sym_s = _pool_astype_str(df_data_vax, df_syms_flat, cols1=['VAERS_ID', 'symptom_entries'])
df_data_vax_syms_consolidated = _merge_parallel(_dvx_s, _sym_s)
del _dvx_s, _sym_s
#nan_alert(df_data_vax_syms_consolidated) # nan in symptom_entries in 1106891 on 03/10/2021 as the symptoms file has no entry line for it even tho Current Illness: Atrial Fibrillation
df_data_vax_syms_consolidated = df_data_vax_syms_consolidated.fillna('')
print(f'{len(df_data_vax_syms_consolidated):>10} rows in df_data_vax_syms_consolidated') # has dupe VAERS_IDs as expected, more than one dose per report
filename_consolidated = dir_consolidated + '/' + files_date_marker + '_VAERS_CONSOLIDATED.csv'
print(f' Saving result into one file w/ multiple rows per VAERS_ID: {filename_consolidated}')
write_to_csv(df_data_vax_syms_consolidated, filename_consolidated)
print()
print(f' Consolidation of {files_date_marker} done')
return
# Integrated from original: flatten()
def flatten(files_date_marker):
'''
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Flattening -- Only one row per VAERS_ID
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = '''
global df_vax, df_data, df_syms_flat, df_flat_2
if not covid_earliest_vaers_id:
files['done'].append(files_date_marker)
return
print()
print(f' Flattening')
files_populate_information()
''' Flatten only if the file doesn't already exist '''
if files_date_marker in files['flattened']['date']:
if files_date_marker in files['flattened']['keyval']: # already done, use it
print(f" Flattening already done: {files['flattened']['keyval'][files_date_marker]}") ; print()
return
else:
print(f" ERROR: Expected {files_date_marker} in files['flattened']['keyval']")
pull_vax_records = 0
file_consolidated = ''
if not len(df_vax): # already exists if consolidate just created it
# vax file records from consolidate file
if files_date_marker in files['consolidated']['keyval']:
file_consolidated = files['consolidated']['keyval'][files_date_marker]
print(f" Pulling vax records from previously consolidated file {file_consolidated}")
pull_vax_records = 1
else:
print(f" ERROR: Expected {files_date_marker} in files['consolidated']['keyval']")
if pull_vax_records:
df_consolidated = open_file_to_df(file_consolidated, doprint=1).fillna('')
columns_vax = ['VAERS_ID', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'VAX_ROUTE', 'VAX_SITE', 'VAX_NAME'] # Note VAX_DATE is in data, not vax file
columns_consolidated = df_consolidated.columns
columns_data = ['VAERS_ID'] + list( set(columns_consolidated) - set(columns_vax) - {'symptom_entries'} )
# Three independent slices — convert to str in parallel.
# vaers36: use groupby('VAERS_ID').first() (content-preserving) instead of
# plain drop_duplicates (order-dependent "first encountered") when collapsing
# the possibly vax-expanded consolidated rows back to one-per-VID for the
# report-level data columns (SYMPTOM_TEXT etc). This avoids accidentally
# picking a blanked row for a VID if row ordering in consolidated varies.
with ThreadPoolExecutor(max_workers=3) as ex:
_fv = ex.submit(lambda: df_consolidated[columns_vax].astype(str))
_fd = ex.submit(lambda: df_consolidated[columns_data].groupby('VAERS_ID', sort=False, as_index=False).first().astype(str))
_fs = ex.submit(lambda: df_consolidated[['VAERS_ID', 'symptom_entries']].groupby('VAERS_ID', sort=False, as_index=False).first().astype(str))
df_vax = _fv.result()
df_data = _fd.result()
df_syms_flat = _fs.result()
del file_consolidated, pull_vax_records, columns_vax, columns_data
if not stats['drop_input_covid']:
stats['drop_input_covid'] = len(df_data)
print(f"=== Added count_input {stats['drop_input_covid']} in def flattening")
''' Aggregate VAX for single VAERS_ID '''
print(f' Aggregate/flatten VAX items. Grouping by VAERS_ID ({NUM_CORES} cores)')
df_vax = df_vax.reset_index(drop=True)
_vids_unique = df_vax['VAERS_ID'].unique()
if NUM_CORES > 1 and len(_vids_unique) > NUM_CORES * 100:
# map/groupby split: O(n) single pass instead of O(n×NUM_CORES) isin() loop
_vax_chunk_map = dict(zip(_vids_unique, np.arange(len(_vids_unique)) % NUM_CORES))
df_vax['_chunk'] = df_vax['VAERS_ID'].map(_vax_chunk_map)
_vax_chunks = [grp.drop(columns='_chunk').reset_index(drop=True)
for _, grp in df_vax.groupby('_chunk', sort=False) if len(grp)]
df_vax = df_vax.drop(columns='_chunk')
with Pool(min(NUM_CORES, len(_vax_chunks))) as pool:
if SHOW_PROGRESS:
_agg_results = list(tqdm(
pool.imap(_agg_vax_chunk, _vax_chunks),
total=len(_vax_chunks), desc=" Aggregating VAX", unit="chunk", leave=True
))
else:
_agg_results = pool.map(_agg_vax_chunk, _vax_chunks)
df_vax_flat = pd.concat(_agg_results).reset_index()
else:
df_vax_flat = df_vax.groupby('VAERS_ID').agg(list).map('|'.join).reset_index()
del df_vax # done with this global
len_before = len(df_vax_flat)
df_vax_flat = df_vax_flat.drop_duplicates(subset=df_vax_flat.columns, keep='first')
if len(df_vax_flat) - len_before:
print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_vax_flat)):>10} duplicates dropped in df_vax_flat, THERE SHOULD BE NONE\n\n\n\n')
check_dupe_vaers_id(df_vax_flat) ; print()
''' Combine DATA into VAX (has multiple records for each VAERS_ID, when more than one dose is listed) '''
print(f' Merging DATA into VAX flattened ({NUM_CORES} cores)')
_vax_s, _data_s = _pool_astype_str(df_vax_flat, df_data)
df_data_vax_flat = _merge_parallel(_vax_s, _data_s)
del _vax_s, _data_s
if nan_alert(df_data_vax_flat):
pause=1
df_data_vax_flat = df_data_vax_flat.fillna('')
check_dupe_vaers_id(df_data_vax_flat)
len_before = len(df_data_vax_flat)
df_data_vax_flat = df_data_vax_flat.drop_duplicates(subset=df_data_vax_flat.columns, keep='first')
if len(df_data_vax_flat) - len_before:
print(f'{(len_before - len(df_data_vax_flat)):>10} duplicates dropped in df_data_vax_flat')
print(f'{len(df_data_vax_flat):>10} rows in df_data_vax_flat') ; print()
''' symptom_entries to flattened '''
print(f' Merging symptom_entries into df_data_vax_syms_flat ({NUM_CORES} cores)')
_dvf_s, _sym_s = _pool_astype_str(df_data_vax_flat, df_syms_flat, cols1=['VAERS_ID', 'symptom_entries'])
df_data_vax_syms_flat = _merge_parallel(_dvf_s, _sym_s)
del _dvf_s, _sym_s
df_data_vax_syms_flat = df_data_vax_syms_flat.fillna('') # there are nans like 1106891 with no record for it in ...symptoms.csv
del df_syms_flat
len_before = len(df_data_vax_syms_flat) # TODO: Understand the reason for the dupes
df_data_vax_syms_flat = df_data_vax_syms_flat.drop_duplicates(subset=df_data_vax_syms_flat.columns, keep='first')
if len(df_data_vax_syms_flat) - len_before:
print(f'{(len_before - len(df_data_vax_syms_flat)):>10} duplicates dropped in df_data_vax_syms_flat') ; print()
'''
SMALLER SYMPTOM_TEXT, REMOVAL OF REPEAT SENTENCES, DEDUPE
'''
# vaers36: the dedupe now has multiple layers of "never blank a VID that had text"
# (worker try, and caller map + pre-sym fallback). This directly addresses the
# systematic SYMPTOM_TEXT='' in vaers35.py FINAL outputs for high-edit reports.
df_data_vax_syms_flat = symptoms_dedupe_repeat_sentences(df_data_vax_syms_flat)
''' Save result into one file like ... 2020-12-25_VAERS_FLATTENED.csv '''
filename_flattened = dir_flattened + '/' + files_date_marker + '_VAERS_FLATTENED.csv'
print(f' Saving result into one file: {filename_flattened}') ; print()
write_to_csv(df_data_vax_syms_flat, filename_flattened)
print(f'{len(df_data_vax_syms_flat):>10} rows with unique VAERS_IDs in {filename_flattened}') ; print()
print(f' Flattening of {files_date_marker} done') ; print()
df_flat_2 = df_data_vax_syms_flat
dict_done_flag['flat2'] = files_date_marker
return
'''
Some VAERS_ID in 2021-01-07 for example with multiple lots/doses (multiple rows in VAX file) for testing
903999 903157 903848 903867 904522 905340 906428 907571 907776 907837 908431 908448 908763 909370 909520 910580 919620
18 doses: https://medalerts.org/vaersdb/findfield.php?IDNUMBER=1900339&WAYBACKHISTORY=ON '''
# Integrated from original: compare()
def compare(date_currently):
'''
Compare based on flattened (one row per VAERS_ID with each row containing all of the information)
saving modifications to changes column.
Add reports to result set when not already there.
VAX file columns:
VAERS_ID VAX_TYPE VAX_MANU VAX_LOT VAX_DOSE_SERIES VAX_ROUTE VAX_SITE VAX_NAME
There are multiple entries with same VAERS_ID, different VAX_LOT and VAX_DOSE_SERIES '''
global df_flat_1
if not covid_earliest_vaers_id:
return
files_populate_information()
file_flattened_previous = get_flattened('previous', date_currently) # like ... 2022-10-28_VAERS_FLATTENED.csv
file_flattened_working = get_flattened('working' , date_currently) # like ... 2022-11-04_VAERS_FLATTENED.csv
if not file_flattened_previous:
print() ; print(f' No flattened file to compare prior to this, {date_currently}, skip compare')
file_for_copy_original = dir_compared + '/' + date_currently + '_VAERS_FLATFILE.csv'
print(f' Due to first drop, copying current {file_flattened_working} to: {file_for_copy_original}') ; print()
shutil.copy(file_flattened_working, file_for_copy_original)
# Give it these columns for consistency
df_flat_init = open_file_to_df(file_for_copy_original, doprint=0)
df_flat_init['cell_edits'] = 0
df_flat_init['status' ] = ''
df_flat_init['changes' ] = ''
df_flat_init = set_columns_order(df_flat_init)
write_to_csv(df_flat_init, file_for_copy_original, open_file=0)
# Initial row in stats file when only one compare/changes file
do_never_ever( df_flat_init.VAERS_ID.to_list(), date_currently, 'compare on df_flat_init')
do_ever_covid( sorted(set(df_flat_init.VAERS_ID.to_list())) )
stats_resolve(date_currently)
return
previous_date = date_from_filename(file_flattened_previous)
if not previous_date: # first run?
exit('No previous_date, cannot continue')
''' Load previous and current flattened files — read from disk in parallel if needed '''
_prv_in_mem = len(df_flat_1) > 0
_new_in_mem = len(df_flat_2) > 0
if _prv_in_mem:
print(f" Using flat {dict_done_flag['flat1']} already in memory, {len(df_flat_1)} rows")
df_flat_prv = df_flat_1.copy()
if _new_in_mem:
print(f" Using flat {dict_done_flag['flat2']} already in memory, {len(df_flat_2)} rows") ; print()
df_flat_new = df_flat_2.copy()
df_flat_new = types_set(df_flat_new)
if not _prv_in_mem and not _new_in_mem:
print(f' Reading both flat files in parallel ({NUM_CORES} cores)')
with ThreadPoolExecutor(max_workers=2) as ex:
fut_prv = ex.submit(open_file_to_df, file_flattened_previous, 1)
fut_new = ex.submit(open_file_to_df, file_flattened_working, 1)
df_flat_prv = fut_prv.result()
df_flat_new = fut_new.result()
check_dupe_vaers_id(df_flat_prv)
check_dupe_vaers_id(df_flat_new)
elif not _prv_in_mem:
df_flat_prv = open_file_to_df(file_flattened_previous, doprint=1)
check_dupe_vaers_id(df_flat_prv)
elif not _new_in_mem:
df_flat_new = open_file_to_df(file_flattened_working, doprint=1)
check_dupe_vaers_id(df_flat_new)
if not stats['drop_input_covid']:
stats['drop_input_covid'] = len(df_flat_new)
print(f"=== Added count_input {stats['drop_input_covid']} in def compare")
dict_done_flag['flat1'] = date_currently
''' cell_edits and changes fields only exist in the changes file
Have to be pulled from there to be updated when applicable '''
print() ; print(f' Previous changes file for changes, cell_edits and status columns')
''' Previous changes filename, conditions, for compare '''
# KEEP FOR NOW if len(df_flat_prv):
# KEEP FOR NOW print(f" Using changes {dict_done_flag['changes']} already in memory")
# KEEP FOR NOW elif len( files['changes']['date'] ):
# KEEP FOR NOW dates_changes = files['changes']['date']
# KEEP FOR NOW date_prv_candidates = [x for x in dates_changes if x < date_currently]
# KEEP FOR NOW date_changes_prv = date_prv_candidates[-1] # most recent
# KEEP FOR NOW filename_changes_previous = files['changes']['keyval'][date_changes_prv] # like TODO
# KEEP FOR NOW df_flat_prv = open_file_to_df(filename_changes_previous, doprint=1)
# KEEP FOR NOW del dates_changes, date_prv_candidates, date_changes_prv, filename_changes_previous
# KEEP FOR NOW
# KEEP FOR NOW else:
# KEEP FOR NOW ''' Resort to initializing with flattened. '''
# KEEP FOR NOW print(f' Due to first CHANGES, using {file_flattened_previous}')
# If first run, it needs these columns since only changes files contain these columns
if 'cell_edits' not in df_flat_prv.columns: df_flat_prv['cell_edits'] = int(0)
if 'changes' not in df_flat_prv.columns: df_flat_prv['changes' ] = ''
if 'status' not in df_flat_prv.columns: df_flat_prv['status' ] = ''
df_flat_prv = df_flat_prv.sort_values(by=['cell_edits', 'status', 'changes'], ascending=False)
if nan_alert(df_flat_prv):
pause=1 # nans in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']
del file_flattened_previous, file_flattened_working
#df_flat_prv_ori = df_flat_prv.copy() ; df_flat_new_ori = df_flat_new.copy()
print()
print(f'{" "*30} Comparing')
print() ; print(f'{" "*30} {previous_date} v. {date_currently}')
'''
1. Identical in both prv and new
2. Excess in prv
a. Deleted in new
b.
3. Excess in new
a. Brand new above highest prv ID
b. Restored
c.
4. Both prv and new remaining
a. Changed
'''
# tmp for testing, normally done in flat but using flat files cooked up manually
if use_test_cases:
df_flat_new = symptoms_dedupe_repeat_sentences(df_flat_new)
df_flat_prv = symptoms_dedupe_repeat_sentences(df_flat_prv)
print()
print(f'{ len(df_flat_new):>10} this drop total covid')
print(f'{ len(df_flat_prv):>10} previous total covid')
''' Initialize df_edits, starting with df_flat_new,
then they will be changed in some cases later, eventually going to df_changes_done '''
# for ensuring all present on return
df_both_flat_inputs = pd.concat([df_flat_prv.reset_index(drop=True), df_flat_new.reset_index(drop=True) ], ignore_index=True).drop_duplicates(subset='VAERS_ID')
df_edits = df_flat_new.copy()
df_changes_done = pd.DataFrame(columns=list(df_flat_prv.columns))
list_vids_changes_in_edits = df_flat_prv.loc[ df_flat_prv.VAERS_ID.isin(df_edits .VAERS_ID) ].VAERS_ID.to_list()
list_vids_edits_in_prv = df_edits.loc [ df_edits .VAERS_ID.isin(df_flat_prv.VAERS_ID) ].VAERS_ID.to_list()
list_vids_in_both = list( set(list_vids_changes_in_edits) & set(list_vids_edits_in_prv) )
print(f'{len(list_vids_in_both):>10} in both previous and new, copying their values in edits, status, changes')
# Bring in the previous changes etc
list_vids_both_only_to_copy = sorted( set(df_flat_prv.VAERS_ID.to_list()) & set(df_edits.VAERS_ID.to_list()) )
list_vids_w_changes_in_both = df_flat_prv.loc[df_flat_prv.cell_edits.ne(0) | df_flat_prv.changes.ne('') | df_flat_prv.status.ne('')].VAERS_ID.to_list()
# Only those with meaningful (changed values) to copy
list_vids_filtered_both_only_to_copy = sorted( set(list_vids_both_only_to_copy) & set(list_vids_w_changes_in_both) )
if len(list_vids_w_changes_in_both):
# Replace 3 sequential copy_column calls (each: isin filter + copy + concat × 2)
# with a single vectorized map over the same filter — ~6× fewer large operations.
_vids_set = set(list_vids_filtered_both_only_to_copy)
_edit_mask = df_edits.VAERS_ID.isin(_vids_set)
_prv_lookup = df_flat_prv.loc[df_flat_prv.VAERS_ID.isin(_vids_set)].set_index('VAERS_ID')
_edit_vids = df_edits.loc[_edit_mask, 'VAERS_ID']
for _col in ('cell_edits', 'changes', 'status'):
df_edits.loc[_edit_mask, _col] = _edit_vids.map(_prv_lookup[_col]).values
df_edits = df_edits.sort_values(['cell_edits', 'status', 'changes'], ascending=False)
# Ensure types are set correctly regardless of whether there are changes
df_edits = types_set(df_edits)
warn_mixed_types(df_edits)
if nan_alert(df_edits):
pause=1 # for breakpoint in debugger
# Add any others from df_flat_prv not in df_edits
df_flat_prv_not_in_edits = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(df_edits.VAERS_ID) ] # PICKS UP DELETED ALSO
print(f' Concat of {len(df_flat_prv_not_in_edits)} previous that were not in edits')
if len(df_flat_prv_not_in_edits):
df_edits = pd.concat([ df_edits.reset_index(drop=True), df_flat_prv_not_in_edits.reset_index(drop=True) ], ignore_index=True)
# After concat, types may get mixed up, so reset them
df_edits = types_set(df_edits)
''' Identical. prv and new
Set cell_edits 0 and nothing in changes column or status.
Move to df_changes_done (removing from df_flat_prv and df_flat_new) '''
print(f' Moving identical rows to done ({NUM_CORES} cores)')
# astype(str) for object arrays is GIL-bound — threads give no real parallelism.
# Use a forked Pool: workers inherit both DataFrames via CoW, each converts a
# column-subset, returning small partial DataFrames that we concat back.
global _g_df_pair, _g_pb_cmp, _g_nb_cmp
_g_df_pair = [df_flat_prv, df_flat_new]
_prv_col_chunks = [list(c) for c in np.array_split(df_flat_prv.columns.tolist(),
min(NUM_CORES, len(df_flat_prv.columns)))]
_new_col_chunks = [list(c) for c in np.array_split(df_flat_new.columns.tolist(),
min(NUM_CORES, len(df_flat_new.columns)))]
_astype_tasks = [(0, c) for c in _prv_col_chunks] + [(1, c) for c in _new_col_chunks]
with Pool(min(NUM_CORES, len(_astype_tasks))) as _apool:
_aresults = _apool.map(_astype_str_fillna_pair_chunk, _astype_tasks)
_g_df_pair = []
_prv_s = pd.concat(_aresults[:len(_prv_col_chunks)], axis=1)
_new_s = pd.concat(_aresults[len(_prv_col_chunks):], axis=1)
_in_both = set(_prv_s['VAERS_ID'].tolist()) & set(_new_s['VAERS_ID'].tolist())
if _in_both:
_pb = _prv_s[_prv_s['VAERS_ID'].isin(_in_both)].sort_values('VAERS_ID').reset_index(drop=True)
_nb = _new_s[_new_s['VAERS_ID'].isin(_in_both)].sort_values('VAERS_ID').reset_index(drop=True)
_cols = [c for c in _pb.columns if c in _nb.columns]
del _prv_s, _new_s
# Element-wise string comparison is also GIL-bound — split columns across workers.
_g_pb_cmp, _g_nb_cmp = _pb, _nb
_cmp_chunks = [list(c) for c in np.array_split(_cols, min(NUM_CORES, len(_cols)))]
with Pool(len(_cmp_chunks)) as _cpool:
_same_arrays = _cpool.map(_compare_rows_col_chunk, _cmp_chunks)
_g_pb_cmp = _g_nb_cmp = None
_same_np = np.logical_and.reduce(_same_arrays) if len(_same_arrays) > 1 else _same_arrays[0]
vids_identical = _pb.loc[pd.Series(_same_np, index=_pb.index), 'VAERS_ID'].astype(int).tolist()
del _pb, _nb
else:
vids_identical = []
del _prv_s, _new_s
# Move identicals to done, also remove these IDs in prv and new
if vids_identical:
df_edits, df_changes_done = move_rows(df_edits.loc[ df_edits.VAERS_ID.isin(vids_identical) ], df_edits, df_changes_done)
print(f' Removing identicals done from previous and new')
df_flat_prv = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(vids_identical) ]
df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(vids_identical) ]
print(f'{len(vids_identical):>10} identical set aside into df_changes_done (first content)')
######del df_merged, df_identical, vids_identical
print(f'{len(df_flat_new):>10} this drop remaining to work with')
print(f'{len(df_flat_prv):>10} previous remaining to work with')
print(f'{len(df_flat_new) - len(df_flat_prv):>10} difference')
# New stuff can be one of three things: higher than the highest prv VAERS_ID or gapfill or the restore of a deleted report
list_new_not_in_prv = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ].VAERS_ID.to_list()
''' Deleted. Flat prv-only
897309 NOT DELETED 897309 42 Deleted 2020-12-25 0 U USPFIZER INC2020443518 2020-11-17 2020-11-18 probably polycellulitis; The whole half of the arm was really red and swollen; The whole half of the arm was really red and swollen; This is a spontaneous report from a contactable pharmacist. A patient of unspecified age and gender received pneumococcal 13-val conj vac (dipht crm197 protein) (PREVNAR 13), intramuscular on an unspecified date at single dose for immunization. The patient's medical history and concomitant medications were not reported. The patient experienced polycellulitis, the whole half of the arm was really red and swollen on an unspecified date. The reporter mentioned that a patient developed something that the reporter thought was probably polycellulitis after shot of unknown pneumonia vaccine product; product might have been Prevnar 13 but reporter was not sure. The patient called the doctor who sent them to a clinic to be seen that day and think theirs was maybe 3 days since they had gotten the unknown pneumonia vaccine if the reporter remembered correctly. This occurred maybe 6 months ago. The reporter just remembered the whole half of the arm was really red and swollen. The outcome of the events was unknown. Lot/batch number has been requested.; Sender's Comments: A contributory role of PREVNAR 13 cannot be fully excluded in triggering the onset of probably polycellulitis with the whole half of the arm was really red and swollen. The impact of this report on the benefit/risk profile of the Pfizer product is evaluated as part of Pfizer procedures for safety evaluation, including the review and analysis of aggregate data for adverse events. Any safety concern identified as part of this review, as well as any appropriate action in response, will be promptly notified to Regulatory Authorities, Ethics Committees and Investigators, as appropriate. PNC13 Pfizer-Wyeth UNK OT PNEUMO PREVNAR13 0 0 0 U 0 UNK 2 _|_Cellulitis_|_Erythema_|_Peripheral swelling_|_
They all exist in df_edits but not in df_flat_new.
Add cell_edits +42 and in status column add Deleted.
Move to df_changes_done, removing from df_edits and df_flat_prv, also from df_flat_new as no compare possible of course. '''
list_deleted_prior = []
list_prv_not_in_new = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(df_flat_new.VAERS_ID) ].VAERS_ID.to_list()
if len(list_prv_not_in_new): # 12-25 deleted for real, 3 of them: [902986, 903003, 903130]
# Deleted already noted earlier, hangers-on
df_deleted_already_noted = df_edits.loc[ df_edits.VAERS_ID.isin(list_prv_not_in_new) & df_edits.status.str.contains(r'Deleted \d{4}-\d{2}-\d{2} $', na=False) ] # note end of line, $
list_deleted_prior = df_deleted_already_noted.VAERS_ID.to_list()
# Deleted new, not already noted earlier
df_deleted_to_notate_new = df_edits.loc[ df_edits.VAERS_ID.isin(list_prv_not_in_new) & ~df_edits.VAERS_ID.isin(df_deleted_already_noted.VAERS_ID) ]
list_deleted_to_notate_new = df_deleted_to_notate_new.VAERS_ID.to_list()
if len(list_deleted_to_notate_new):
df_edits.loc[ df_edits.VAERS_ID.isin(list_deleted_to_notate_new), 'status' ] += f'Deleted {date_currently} '
df_edits.loc[ df_edits.VAERS_ID.isin(list_deleted_to_notate_new), 'cell_edits' ] += len(columns_vaers)
len_newly_deleted = len(list_deleted_to_notate_new)
print(f'{ len_newly_deleted:>10} newly deleted kept: {subrange(list_deleted_to_notate_new, 6)}')
print(f'{len(list_deleted_prior):>10} deleted that were noted already: {subrange(list_deleted_prior, 6)}')
#df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_deleted_prior)], df_edits, df_changes_done)
#df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_deleted_to_notate_new)], df_edits, df_changes_done)
#df_flat_prv = df_flat_prv .loc[ ~df_flat_prv .VAERS_ID.isin(list_prv_not_in_new) ]
#df_flat_prv = df_flat_prv .loc[ ~df_flat_prv .VAERS_ID.isin(list_deleted_prior) ] # remove them from flat_prv, and they are already not in new
#df_flat_prv = df_flat_prv .loc[ ~df_flat_prv .VAERS_ID.isin(list_deleted_to_notate_new) ]
stats['deleted'] += len_newly_deleted
# These are in prv and NOT new so no compare is possible, they're done
df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_prv_not_in_new)], df_edits, df_changes_done)
df_flat_prv = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(list_prv_not_in_new) ] # remove from prv
######del df_deleted_to_notate_new, list_deleted_to_notate_new
'''
1. Identical in both prv and new
2. Excess in prv
a. Deleted in new
b.
3. Excess in new
a. Brand new above highest prv ID
b. Restored
c. Gapfill
4. Both prv and new remaining
a. Changed
'''
''' Restored. Flat new that were Deleted but now being Restored
They all exist in df_edits and are marked Deleted.
Add cell_edits +42 and add status column Restored.
Remove them from df_flat_new but keep them in df_edits since there could be other changes also. '''
df_restored = df_edits.loc[ df_edits.VAERS_ID.isin(df_flat_new.VAERS_ID) & df_edits.status.str.contains(r'Deleted \d{4}-\d{2}-\d{2} $', na=False) ]
list_restored = df_restored.VAERS_ID.to_list()
if list_restored:
# WAIT, ARE THESE IN DF_EDITS OR JUST DF_FLAT_NEW?
df_edits.loc[ df_edits.VAERS_ID.isin(list_restored), 'cell_edits' ] += len(columns_vaers)
df_edits.loc[ df_edits.VAERS_ID.isin(list_restored), 'status' ] += f'Restored {date_currently} ' # Mark them as Restored
df_edits, df_changes_done = move_rows(df_edits.loc[ df_edits.VAERS_ID.isin(list_restored) ], df_edits, df_changes_done) # TODO: If changes also, those are missed here
###### WTF Do rerun #####df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_restored) ] # remove them from df_flat_new being scrutinized
df_flat_prv = df_flat_prv.loc[ ~df_flat_prv.VAERS_ID.isin(list_restored) ] # some confusion, just removing, TODO
stats['restored'] += len(list_restored)
######del df_deleted_already_noted
''' Greater-Than. Flat new-only high VAERS_ID: df_flat_new VAERS_ID above max df_flat_prv
Set cell_edits 0 and nothing in changes or status column.
Move to df_changes_done (removing from df_flat_new) '''
list_flat_new_gt_vids = df_flat_new.loc[ df_flat_new.VAERS_ID.gt( df_flat_prv.VAERS_ID.max() ) ].VAERS_ID.to_list()
list_flat_new_Lt_vids = df_flat_new.loc[ df_flat_new.VAERS_ID.lt( df_flat_prv.VAERS_ID.max() ) ].VAERS_ID.to_list()
''' Gapfill/Throttled/Delayed/Late. Flat new-only remaining. (VAERS_IDs that didn't show up in in the sequence their drop but only later)
First appearance, late. Is possible because all greater-than prv.max() have already been moved.
Move from df_edits to df_changes_done and remove from df_flat_new '''
list_prv_vids = df_flat_prv.VAERS_ID.to_list()
list_gapfills = sorted( set(list_flat_new_Lt_vids) - set(list_prv_vids) )
list_gapfills = sorted( set(list_gapfills) - set(list_restored) ) # cludgy but seems to get the job done
if len(list_gapfills):
df_edits = df_edits.copy().sort_values(by=['status', 'changes', 'cell_edits'], ascending=False)
df_edits.loc[ df_edits.VAERS_ID.isin(list_gapfills), 'status' ] += f'Delayed {date_currently} ' # staying with the term Delayed for now
stats['gapfill'] += len(list_gapfills)
# These are in new and NOT prv so no compare is possible, they're done
df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_gapfills)], df_edits, df_changes_done)
df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_gapfills) ] # remove from new
check_dupe_vaers_id(df_edits)
check_dupe_vaers_id(df_changes_done)
print(f'{ len(list_flat_new_gt_vids):>10} new (higher than max VAERS_ID in previous) for {date_currently}')
print(f'{ len(list_gapfills):>10} delayed this drop, filling gaps' )
print(f'{ len_newly_deleted:>10} newly deleted this drop kept' )
print(f'{ len(list_restored):>10} restored this drop {subrange(list_restored, 6)}' )
######del list_flat_new_gt_vids, list_gapfills, list_prv_not_in_new, list_restored, len_newly_deleted
len_before = len(df_flat_prv)
df_flat_prv = df_flat_prv.drop_duplicates(subset=df_flat_prv.columns, keep='last')
if len_before - len(df_flat_prv): # Should be none. Overwrite of values now done
print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_flat_prv)):>10} complete duplicates dropped in df_flat_prv, SHOULD NOT HAPPEN \n\n\n\n')
len_before = len(df_flat_new)
df_flat_new = df_flat_new.drop_duplicates(subset=df_flat_new.columns, keep='last') # Should be none. Overwrite of values now done
if len_before - len(df_flat_new):
print(f'\n\n\n\n\t\t\t\t {(len_before - len(df_flat_new)):>10} complete duplicates dropped in df_flat_new, SHOULD NOT HAPPEN \n\n\n\n')
# Brand new reports only to done, no compare needed
df_only_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ]
list_only_flat_new = df_only_flat_new.VAERS_ID.to_list()
if list_only_flat_new:
df_flat_new = df_flat_new.loc[ ~df_flat_new.VAERS_ID.isin(list_only_flat_new) ]
df_edits, df_changes_done = move_rows(df_edits.loc[df_edits.VAERS_ID.isin(list_only_flat_new)], df_edits, df_changes_done)
# Ensure prv and new are now the same reports, to be compared
if prv_new_error_check(df_flat_prv, df_flat_new):
pause=1 # nans in ['CAGE_YR', 'CAGE_MO', 'HOSPDAYS']
if nan_alert(df_flat_prv) or nan_alert(df_flat_new):
pause=1
''' Remaining are only common VAERS_IDs and same number of rows containing some fields that vary. '''
df_flat_prv = df_flat_prv.sort_values(by='VAERS_ID') ; df_flat_prv = df_flat_prv.reset_index(drop=True)
df_flat_new = df_flat_new.sort_values(by='VAERS_ID') ; df_flat_new = df_flat_new.reset_index(drop=True)
df_flat_new = df_flat_new.reindex(df_flat_prv.columns, axis=1) # Same column order. TODO: Find out how the columns changed order after adding the blanks block of code below
if len(df_edits):
if nan_alert(df_edits):
pause=1
len_edits = 0 # ongoing count of modified reports
vid_cols_affected = {}
warn_mixed_types(df_edits)
df_flat_prv = types_set(df_flat_prv)
warn_mixed_types(df_flat_prv)
warn_mixed_types(df_flat_new)
warn_mixed_types(df_changes_done)
list_prv_and_new_both = set( df_flat_prv.VAERS_ID.to_list() ) & set( df_flat_new.VAERS_ID.to_list() )
if not len(df_flat_new):
print(f'{str(0):>10} column changes in {len(columns_vaers)} columns')
else:
print() ; print(' Column value changes') ; print() # section could do with some refactoring smarter surely
_skip = {'cell_edits', 'status', 'changes'}
try:
if len(df_flat_prv) != len(df_flat_new):
raise ValueError(f"Row count mismatch: prv={len(df_flat_prv)} new={len(df_flat_new)}")
# Fast per-column equals() check — avoids building the expensive compare() output DataFrame
cols_changed_list = sorted([
col for col in df_flat_prv.columns
if col not in _skip and not df_flat_prv[col].equals(df_flat_new[col])
])
except Exception as e: # note error, patch and allow to continue
error(f' prv and new len diff {e} {inspect.stack()[1][2]}')
print(f'{line()} {e}')
if len(df_flat_prv) > len(df_flat_new): # TODO: Is this insane????? WHY ARE THEY DIFFERENT IN THE FIRST PLACE????
df_flat_prv = df_flat_prv.loc[ df_flat_prv.VAERS_ID.isin(df_flat_new.VAERS_ID) ]
df_flat_prv = df_flat_prv.reset_index(drop=True)
df_flat_new = df_flat_new.reset_index(drop=True)
elif len(df_flat_prv) < len(df_flat_new):
df_flat_new = df_flat_new.loc[ df_flat_new.VAERS_ID.isin(df_flat_prv.VAERS_ID) ]
df_flat_prv = df_flat_prv.reset_index(drop=True)
df_flat_new = df_flat_new.reset_index(drop=True)
cols_changed_list = sorted([
col for col in df_flat_prv.columns
if col not in _skip and not df_flat_prv[col].equals(df_flat_new[col])
])
check_dupe_vaers_id(df_flat_prv)
check_dupe_vaers_id(df_flat_new)
count_cols_altered = len(cols_changed_list)
show_col_row_progress = 0
count_cols = 0
''' Debug utility
Specify a VAERS_ID in vid_target
and/or a debug_cols to focus on. '''
# Build per-column args in parallel using a fork-based Pool.
# Forked workers inherit the full-column numpy arrays from module globals without
# pickling them over IPC — each worker receives only a tiny column-name string.
global _g_prv_arrs, _g_new_arrs, _g_ids_np_global, _g_col_date, _g_col_test
_g_prv_arrs = {col: df_flat_prv[col].to_numpy() for col in cols_changed_list}
_g_new_arrs = {col: df_flat_new[col].to_numpy() for col in cols_changed_list}
_g_ids_np_global = df_flat_prv['VAERS_ID'].to_numpy()
_g_col_date = date_currently
_g_col_test = use_test_cases
_build_w = min(NUM_CORES, len(cols_changed_list)) if cols_changed_list else 1
print(f' Building {len(cols_changed_list)} column filter masks ({_build_w} cores)...')
with Pool(_build_w) as _build_pool:
col_args = list(_build_pool.map(_build_col_arg_global, cols_changed_list))
_g_prv_arrs.clear(); _g_new_arrs.clear()
_g_ids_np_global = _g_col_date = _g_col_test = None
_total_chg_rows = sum(len(a[1]) for a in col_args)
# Sub-chunk large columns so one slow column (e.g. SYMPTOM_TEXT with 50k changes)
# doesn't starve all other cores. Target ~NUM_CORES*4 tasks total so every
# worker stays busy and load balances naturally with chunksize=1.
_row_task_cap = max(2000, _total_chg_rows // max(1, NUM_CORES * 4))
_flat_tasks = [] # list of col_arg tuples (possibly sub-chunked)
_task_labels = [] # column name for each task (for reassembly)
for _ca in col_args:
_col, _ids, _prv, _new, _dt, _tc = _ca
_n = len(_ids)
if _n <= _row_task_cap:
_flat_tasks.append(_ca)
_task_labels.append(_col)
else:
_n_sub = min(NUM_CORES, max(2, _n // _row_task_cap))
for _idx in np.array_split(np.arange(_n), _n_sub):
_flat_tasks.append((_col, _ids[_idx], _prv[_idx], _new[_idx], _dt, _tc))
_task_labels.append(_col)
_n_tasks = len(_flat_tasks)
print(f' Processing {len(col_args)} changed columns → {_n_tasks} tasks ({_total_chg_rows:,} changed-row pairs) across {NUM_CORES} cores...')
with Pool(min(NUM_CORES, _n_tasks)) as pool:
if SHOW_PROGRESS:
_raw_results = list(tqdm(
pool.imap(_compare_col_worker, _flat_tasks, chunksize=1),
total=_n_tasks, desc=" Comparing columns", unit="task", leave=True
))
else:
_raw_results = list(pool.imap(_compare_col_worker, _flat_tasks, chunksize=1))
# Merge sub-chunk results back into one result per column
from collections import defaultdict as _dd
_by_col = _dd(list)
for _lbl, _res in zip(_task_labels, _raw_results):
_by_col[_lbl].append(_res)
all_col_results = []
for _col in dict.fromkeys(_task_labels): # preserve original column order
_chunks = _by_col[_col]
if len(_chunks) == 1:
all_col_results.append(_chunks[0])
else:
_m_rup = []; _m_bb = None
_m_st = {'cells_edited': 0, 'cells_emptied': 0,
'trivial_changes_ignored': 0, 'col_count': 0}
_m_pl = []
for _, _rup, _bb, _st, _pl in _chunks:
_m_rup.extend(_rup)
if _bb:
if _m_bb is None:
_m_bb = {**_bb, 'vids': list(_bb['vids']),
'prv_vals': dict(_bb['prv_vals'])}
else:
_m_bb['vids'].extend(_bb['vids'])
_m_bb['prv_vals'].update(_bb['prv_vals'])
_m_bb['cells_emptied'] += _bb['cells_emptied']
for _k in _m_st:
_m_st[_k] += _st[_k]
_m_pl.extend(_pl)
all_col_results.append((_col, _m_rup, _m_bb, _m_st, _m_pl))
# Apply all results sequentially to df_edits
df_edits = df_edits.reset_index(drop=True)
vid_index = pd.Series(df_edits.index, index=df_edits['VAERS_ID'].values)
for col, row_updates, bulk_blank, col_stats, print_lines in all_col_results:
# Flush buffered print output from worker
for line_str in print_lines:
print(line_str)
# Accumulate stats
stats['cells_edited'] += col_stats['cells_edited']
stats['cells_emptied'] += col_stats['cells_emptied']
stats['trivial_changes_ignored'] += col_stats['trivial_changes_ignored']
if col_stats['col_count']:
stats['columns'][col] = stats['columns'].get(col, 0) + col_stats['col_count']
# Apply bulk-blank update
if bulk_blank:
vids_list = bulk_blank['vids']
prv_vals = bulk_blank['prv_vals']
bb_col = bulk_blank['col']
bb_date = bulk_blank['date']
stats['cells_emptied'] += bulk_blank['cells_emptied']
mask = df_edits['VAERS_ID'].isin(vids_list)
df_edits.loc[mask, 'cell_edits'] = df_edits.loc[mask, 'cell_edits'].astype(int) + 1
changes_delta = df_edits.loc[mask, 'VAERS_ID'].map(
{v: f"{bb_col} {bb_date}: {prv_vals.get(v, '')} <> [] " for v in vids_list}
).fillna('')
df_edits.loc[mask, 'changes'] = df_edits.loc[mask, 'changes'].astype(str) + changes_delta
# Apply per-row updates (vectorized: collect then bulk-apply)
if row_updates:
edit_idxs = []
change_map = {} # idx -> accumulated note string
val_map = {} # idx -> value to keep
for vid, edits_add, changes_note, val_to_keep in row_updates:
if vid not in vid_index.index:
continue
idx = vid_index[vid]
if edits_add:
edit_idxs.append(idx)
if changes_note:
change_map[idx] = change_map.get(idx, '') + changes_note
if val_to_keep is not None:
val_map[idx] = val_to_keep
if edit_idxs:
df_edits.loc[edit_idxs, 'cell_edits'] = (
df_edits.loc[edit_idxs, 'cell_edits'].astype(int) + 1
)
if change_map:
c_idxs = list(change_map.keys())
df_edits.loc[c_idxs, 'changes'] = (
df_edits.loc[c_idxs, 'changes'].astype(str).values
+ [change_map[i] for i in c_idxs]
)
if val_map:
v_idxs = list(val_map.keys())
df_edits.loc[v_idxs, col] = [val_map[i] for i in v_idxs]
print()
print(f"{count_cols_altered:>10} {single_plural(count_cols_altered, 'columns')} altered")
del count_cols_altered
'''
Around here ...
FutureWarning: Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version.
Call result.infer_objects(copy=False) instead.
To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
'''
len_before_edits = len(df_edits)
df_edits = df_edits.drop_duplicates(subset='VAERS_ID', keep='last')
if len_before_edits - len(df_edits):
print(f'{(len_before_edits - len(df_edits)):>10} duplicates removed from df_edits before final move')
len_edits += len(df_edits)
print(f"{len_edits:>10} modified {single_plural(len_edits, 'reports')} on {date_currently}") ; print()
if nan_alert(df_changes_done): # should not be necessary, TODO
pause=1
#df_changes_done['cell_edits'] = df_changes_done['cell_edits'].astype('float64').astype(int)
''' Add everything, with changes now set, into df_changes_done '''
df_edits, df_changes_done = move_rows(df_edits, df_edits, df_changes_done) # moving entire df_edits to df_changes_done
check_dupe_vaers_id(df_edits)
''' VAERS_IDs are int here '''
len_before = len(df_changes_done) # All columns
df_changes_done = df_changes_done.drop_duplicates(subset=df_changes_done.columns, keep='last') # Should be none. Overwrite of values now done
if len_before - len(df_changes_done):
print(f'{(len_before - len(df_changes_done)):>10} complete duplicates dropped in df_changes_done')
len_before = len(df_changes_done) # Just VAERS_ID, error-checking to make sure
df_changes_done = df_changes_done.drop_duplicates(subset='VAERS_ID', keep='last') # Should be none.
if len_before - len(df_changes_done):
print(f'\n\n\n\n\t\t\t\t{(len_before - len(df_changes_done)):>10} duplicate VAERS_ID dropped in df_changes_done, should never happen\n\n\n\n')
''' Sort the output by cell_edits. Section needs scrutiny.
'''
df_changes_done = df_changes_done.fillna('')
''' Annoying future warning and I don't know how best to resolve it. FutureWarning: Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
https://stackoverflow.com/questions/77900971/pandas-futurewarning-downcasting-object-dtype-arrays-on-fillna-ffill-bfill
'''
check_dupe_vaers_id(df_changes_done)
df_changes_done = df_changes_done.sort_values(by=['cell_edits', 'status', 'changes'], ascending=False)
if nan_alert(df_changes_done):
pause=1
''' VAERS_IDs with the most records/doses/lots in each report and at least one covid19.
(There's surely a better way with lambda or .apply() and/or .groupby() etc).
Here, just counting delimiters in VAX_LOT and adding 1 to that.
A report in df_vax has 18 entries in 2020-12-25 but no covid, for example, see df_vax.VAERS_ID.value_counts().unique()
vid_top_doses_list = sorted(df_doses.loc[ df_doses.doses.eq(df_doses.doses.max()) ].VAERS_ID.to_list())
TypeError: '<' not supported between instances of 'str' and 'int'
Tried df_changes_done.fillna('') above, didn't work.
Trying VAX_LOT.astype(str) <== TODO: Should be done earlier. Some must have been made in automatically when read.
'''
df_changes_done['VAX_LOT'] = df_changes_done.VAX_LOT.astype(str)
df_doses = df_changes_done.loc[df_changes_done.VAX_LOT.astype(str).str.contains(r'\|', na=False)][['VAERS_ID', 'VAX_LOT']]
if nan_alert(df_doses):
pause=1
df_doses['doses'] = df_doses.VAX_LOT.astype(str).str.count(r'\|')
filename = dir_compared + '/' + date_currently + '_VAERS_FLATFILE.csv'
print(f' Writing ... {filename}') ; print()
# When running against multiple input dates, open just the last one
do_open = 0
if files['input']['date'][-1] in filename: # Last one
do_open = 1
df_changes_done = set_columns_order(df_changes_done)
print()
print(f' This drop {do_elapsed(elapsed_drop)}')
print(f' Overall {do_elapsed(elapsed_begin)}')
print()
write_to_csv(df_changes_done, filename, open_file=do_open) # Has to be written here, the one big file. Separation of A and B later when necessary for the Excel rows limit.
# Thru this transition for clarity, df_changes_done becomes the next df_flat_prv (when processing multiple drops)
df_flat_1 = df_changes_done.copy() # use as the previous in next when processing more than one drop.
dict_done_flag[ 'changes' ] = date_currently # bookkeeping
#lookup(previous_date, date_currently, vid_cols_affected) # for debug, open a csv showing all of the reports touched for these dates compared
vid_top_doses_list = sorted(df_doses.loc[ df_doses.doses.eq(df_doses.doses.max()) ].VAERS_ID.to_list())
top_num = df_doses.doses.max() + 1
print(f'{len(vid_top_doses_list):>10} {single_plural(len(vid_top_doses_list), "reports")} with the most ({top_num}) records/lots/doses: {" ".join([str(x) for x in vid_top_doses_list])}')
#print(f"{stats['comparisons']:>10} {single_plural(stats['comparisons'], 'comparisons')} done")
vids_present = df_both_flat_inputs.VAERS_ID.to_list()
do_never_ever( vids_present, date_currently, 'compare on df_both_flat_inputs' )
do_ever_covid( vids_present )
stats_resolve( date_currently )
''' Ensure nothing is lost '''
verify_all_reports_present(df_both_flat_inputs, df_changes_done) # Relies on collection of IDs, new, never, ever seen
return
# Integrated from original: open_files()
def open_files(_date): # like './vaers_drop_inputs/2020-12-25'
''' Input files in dir_input:
csv within directories
zip files in a single directory, containing csv, treated as if they are folders, sort of. '''
files_populate_information()
if _date in files['consolidated']['date']: # already consolidated
print(f' {_date} already consolidated, no need to copy input files to dir_working')
shutil.rmtree(dir_working) # removing directory
os.mkdir(dir_working)
set_files_date_marker(dir_working, _date) # a flag used by consolidate()
return True # Already consolidated, so success
if _date in files['flattened']['date']:
print(f' Skipping unzip because flattened for {_date} already exists')
return True # Already processed, so success
if not _date in files['input']['keyval']:
exit(f" Failed to find in files['input']['keyval'] the _date {_date} in open_files() ")
files_value = files['input']['keyval'][_date]
if 'csv' in files_value:
print(f' Copy all {_date} to {dir_working}')
to_copy = [x for x in files['input']['files'] if _date in x]
shutil.rmtree(dir_working)
os.mkdir(dir_working)
for x in to_copy:
shutil.copy(x, dir_working)
elif isinstance(files_value, list): # another case earlier in development, csv files already extracted manually
print(f' Copy {_date}/* to {dir_working}')
shutil.rmtree(dir_working) # removing directory to avoid error next line
shutil.copytree(_date, dir_working)
set_files_date_marker(dir_working, _date)
elif 'zip' in files_value: # zip file, treat it sort of like a directory here
shutil.rmtree(dir_working)
os.makedirs(dir_working)
success = files_from_zip(files_value, dir_working)
if not success:
print(f' Skipping date {_date} due to zip file error')
return False # Indicate this date should be skipped
else:
exit(f' Unexpected _date {_date} in open_files() ')
set_files_date_marker(dir_working, _date) # a visual aid showing input file date marker, not really a file so much
return True # Indicate success
# Integrated from original: verify_all_reports_present()
def verify_all_reports_present(df_compare_input, df_compare_output): # Ensure nothing is lost
list_in_input_not_in_output = df_compare_input.loc[ ~df_compare_input.VAERS_ID.isin(df_compare_output.VAERS_ID) ].VAERS_ID.to_list()
if list_in_input_not_in_output:
print(f'\n\n\nERROR:\n{len(list_in_input_not_in_output)} VAERS_IDs in df_compare_input NOT in df_compare_output \n {subrange(list_in_input_not_in_output, 6)} \n\n')
set_vids_in_output = set( df_compare_output.VAERS_ID.to_list() )
set_ever_covid = set( ever_covid.keys() )
vids_in_df_not_in_ever_covid = set_vids_in_output - set_ever_covid
vids_not_in_df_in_ever_covid = set_ever_covid - set_vids_in_output
if vids_in_df_not_in_ever_covid:
print(f'{len(vids_in_df_not_in_ever_covid):>10} vids_in_df_not_in_ever_covid: {subrange(vids_in_df_not_in_ever_covid, 6)}') ; print()
if vids_not_in_df_in_ever_covid:
print(f'{len(vids_not_in_df_in_ever_covid):>10} vids_not_in_df_in_ever_covid: {subrange(vids_not_in_df_in_ever_covid, 6)}') ; print()
# Make certain none lost or extra
vids_set_of_all_in_prv_and_new = set( df_compare_input.VAERS_ID.to_list() )
vids_orphans = vids_set_of_all_in_prv_and_new - set(df_compare_output.VAERS_ID.to_list())
vids_extras = set(df_compare_output.VAERS_ID.to_list()) - vids_set_of_all_in_prv_and_new
if vids_orphans:
df_orphans = df_compare_input.loc[ ~df_compare_input.VAERS_ID.isin(df_compare_output.VAERS_ID) ]
print(f'\n\n\n{len(df_orphans):>10} orphans, records in df_compare_input that are not in df_compare_output')
print(' M U S T F I X \n\n\n')
write_to_csv(df_orphans, 'orphans.csv', open_file=1)
df_orphans, df_compare_output = move_rows(df_orphans, df_orphans, df_compare_output) # moving all of them out to done
if vids_extras:
df_extras = df_compare_output.loc[ ~df_compare_output.VAERS_ID.isin(df_compare_input.VAERS_ID) ]
print(f'\n\n\n{len(df_extras):>10} extras, records in df_compare_output that are not in df_compare_input')
print(f' M U S T F I X {subrange(df_extras.VAERS_ID.to_list(), 6)} \n\n\n')
write_to_csv(df_extras, 'extras.csv', open_file=1)
# Integrated from original: show_vid_as_text()
def show_vid_as_text(df, vid):
if not type(vid) is int: # TODO: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)
print(f' Expected vid {vid} to be an integer')
if vid not in df.VAERS_ID.to_list():
print(f' vid {vid} not found in the dataframe')
return
filename = f'{vid}.txt'
df = df.loc[ df.VAERS_ID.eq(vid) ]
df.to_csv(filename, sep=',', encoding='utf-8-sig', index=False)
sp.Popen(filename, shell=True)
# =============================================================================
# FINAL MERGED FILE CREATION
# =============================================================================
def create_final_merged_file():
"""
Create a final merged file from the latest FLATFILE in 2_vaers_full_compared.
This file contains all records with their complete change history.
"""
print()
print("="*80)
print("CREATING FINAL MERGED OUTPUT FILE")
print("="*80)
print()
# Find the latest FLATFILE
flatfiles = glob.glob(f'{dir_compared}/*_VAERS_FLATFILE.csv')
if not flatfiles:
flatfiles = glob.glob(f'{dir_compared}/*_VAERS_FLATFILE_A.csv')
if not flatfiles:
print(" No FLATFILE found in 2_vaers_full_compared directory")
return
# Sort by date in filename to get the latest
flatfiles.sort()
latest_file = flatfiles[-1]
print(f" Using latest file: {os.path.basename(latest_file)}")
try:
# Read the latest FLATFILE
print(f" Reading {os.path.basename(latest_file)}...")
df_final = pd.read_csv(latest_file, encoding='utf-8-sig', low_memory=False)
print(f" Total records: {len(df_final):,}")
# Create output filename with timestamp
output_filename = f'{dir_top}/VAERS_FINAL_MERGED.csv'
# Write the final merged file
print(f" Writing final merged file: {output_filename}")
write_to_csv(df_final, output_filename, open_file=False)
print()
print(f" ✓ Final merged file created successfully!")
print(f" Location: {output_filename}")
print(f" Records: {len(df_final):,}")
# Print summary statistics
if 'cell_edits' in df_final.columns:
total_edits = df_final['cell_edits'].sum()
records_with_edits = len(df_final[df_final['cell_edits'] > 0])
print(f" Total cell edits: {total_edits:,}")
print(f" Records with edits: {records_with_edits:,}")
if 'status' in df_final.columns:
deleted_count = len(df_final[df_final['status'].str.contains('Deleted', na=False)])
if deleted_count > 0:
print(f" Deleted records: {deleted_count:,}")
print()
except Exception as e:
error(f"Failed to create final merged file: {e}")
print(f" Error: {e}")
import traceback
traceback.print_exc()
# =============================================================================
# MAIN EXECUTION
# =============================================================================
def run_all():
"""Main execution function with all enhancements"""
print(__file__)
print()
print('='*80)
print('VAERS Enhanced Processing - Complete Edition')
print('='*80)
print()
print('Features:')
print(' ✓ Multi-core parallel processing')
print(' ✓ Memory-efficient chunked data handling')
print(' ✓ Command-line dataset selection (COVID/Full)')
print(' ✓ Progress bars for all operations')
print(' ✓ Comprehensive error tracking')
print(' ✓ Fixed stats functionality')
print(' ✓ All original processing logic integrated')
print()
# If merge-only flag is set, just create the final file and exit
if args.merge_only:
print('='*80)
print("MERGE-ONLY MODE: Creating final merged file")
print("="*80)
print()
create_final_merged_file()
return
if not validate_dirs_and_files():
exit_script("Directory validation failed")
# Determine total file count and how many are already done (for resume)
files_populate_information()
total_input_files = len(files['input']['date'])
_done_set = set(files['changes']['date'])
files_already_done = sum(1 for d in files['input']['date'] if d in _done_set)
print()
print("="*80)
if files_already_done:
print(f"RESUMING PROCESSING (skipping {files_already_done} already-completed drops)")
else:
print("STARTING PROCESSING")
print("="*80)
print()
# Sticky header: row 1 of the terminal is reserved as a persistent status bar.
# All other output scrolls in rows 2..N so the status is never pushed off-screen.
_tty = _run_tee._orig if _run_tee and hasattr(_run_tee, '_orig') else sys.stdout
_sticky = _StickyHeader(_tty)
overall_pbar = None
if SHOW_PROGRESS and total_input_files > 0:
overall_pbar = tqdm(
total=total_input_files,
initial=files_already_done,
desc="Overall",
unit="file",
file=_tty, # write directly to terminal, not through Tee
bar_format='{desc}: {n_fmt}/{total_fmt} [{bar}] {percentage:3.0f}% '
'elapsed {elapsed} ETA {remaining} {postfix}',
position=0,
dynamic_ncols=True,
)
file_num = files_already_done # start counter at the correct offset on resume
file_times = [] # wall-clock seconds for each completed file
file_start = None # start time of the current file
def _eta_summary(completed, total, times):
"""Return a short adaptive-ETA string from per-file timings."""
if not times:
return ''
# Use the last min(completed, 5) files for a rolling average (more weight on recent)
recent = times[-5:]
avg = sum(recent) / len(recent)
remaining = total - completed
eta_sec = avg * remaining
h = int(eta_sec // 3600)
m = int((eta_sec % 3600) // 60)
s = int(eta_sec % 60)
if h:
eta_str = f'{h}h{m:02d}m'
elif m:
eta_str = f'{m}m{s:02d}s'
else:
eta_str = f'{s}s'
avg_str = f'{avg:.0f}s/file' if avg < 120 else f'{avg/60:.1f}m/file'
return f'~{eta_str} remaining ({avg_str} rolling avg, {len(times)} samples)'
# Main processing loop
while more_to_do():
this_drop_date = get_next_date()
if this_drop_date is None:
break
file_num += 1
file_start = _time.time()
files_left_now = total_input_files - file_num
_sticky_text = (f'File {file_num}/{total_input_files} '
f'({this_drop_date}) — {files_left_now} left')
_sticky.update(_sticky_text)
if overall_pbar:
overall_pbar.set_postfix_str(f'{this_drop_date} [{file_num}/{total_input_files}]')
print_date_banner(this_drop_date, file_num, total_input_files)
stats_initialize(this_drop_date)
# Try to open files, skip this date if it fails
if open_files(this_drop_date) == False:
print(f' Skipping processing for {this_drop_date} due to file errors')
files['done'].append(this_drop_date) # Mark as done to move to next
elapsed_file = _time.time() - file_start
file_times.append(elapsed_file)
if overall_pbar:
overall_pbar.update(1)
overall_pbar.set_postfix_str(_eta_summary(file_num, total_input_files, file_times))
continue
consolidate(this_drop_date)
flatten(this_drop_date)
compare(this_drop_date)
elapsed_file = _time.time() - file_start
file_times.append(elapsed_file)
eta_line = _eta_summary(file_num, total_input_files, file_times)
_INV, _RST = '\033[7m', '\033[0m'
files_left = total_input_files - file_num
done_line = (f'File {_INV}{file_num}/{total_input_files}{_RST} '
f'({this_drop_date}) done in {do_elapsed(file_start)}'
f' — {_INV}{files_left} left{_RST} {eta_line}')
print(f'\n {done_line}')
# Keep sticky header current with the just-completed file
_sticky_text = (f'DONE {file_num}/{total_input_files} '
f'({this_drop_date}) — {files_left} left {eta_line}')
_sticky.update(_sticky_text)
if overall_pbar:
overall_pbar.update(1)
overall_pbar.set_postfix_str(eta_line)
if overall_pbar:
overall_pbar.close()
_sticky.close()
# Create final merged output file
create_final_merged_file()
elapsed = do_elapsed(elapsed_begin)
print()
print("="*80)
print(f"PROCESSING COMPLETE - Total time: {elapsed}")
print("="*80)
print()
# Print final error summary
print_errors_summary()
if __name__ == "__main__":
try:
run_all()
except KeyboardInterrupt:
print("\n\nProcess interrupted by user")
exit_script()
except Exception as e:
error(f"Unexpected error in main: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
exit_script()