batch_crop_resize.py
· 7.2 KiB · Python
Raw
#!/usr/bin/env python3
"""
Fast parallel image processing script to crop whitespace and resize images.
Removes white space from left and right, ensures minimum height and/or width.
Uses multiprocessing for high-speed batch processing of large image collections.
"""
import os
import sys
from pathlib import Path
from PIL import Image, ImageChops
import argparse
from multiprocessing import Pool, cpu_count
from functools import partial
from tqdm import tqdm
# Allow processing of very large images
Image.MAX_IMAGE_PIXELS = None
def trim_whitespace(img, tolerance=10):
"""
Remove whitespace from left and right sides of image.
Args:
img: PIL Image object
tolerance: How much variation from white to accept (0-255)
Returns:
Cropped PIL Image object
"""
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
# Create a background image that's solid white
bg = Image.new('RGB', img.size, (255, 255, 255))
# Get difference between image and white background
diff = ImageChops.difference(img, bg)
# Convert to grayscale and get bounding box
diff = diff.convert('L')
# Apply tolerance - values below tolerance become 0 (considered white)
bbox = diff.point(lambda x: 0 if x < tolerance else 255).getbbox()
if bbox:
# Crop to remove whitespace on left and right only
left, top, right, bottom = bbox
# Keep original top and bottom, only crop left and right
return img.crop((left, 0, right, img.height))
return img
def resize_to_min_dimensions(img, min_height=1400, min_width=None):
"""
Resize image to ensure minimum dimensions while maintaining aspect ratio.
Args:
img: PIL Image object
min_height: Minimum height in pixels
min_width: Minimum width in pixels (optional)
Returns:
Resized PIL Image object
"""
width, height = img.size
# Calculate scale factors needed for each dimension
scale_height = min_height / height if height < min_height else 1.0
scale_width = min_width / width if min_width and width < min_width else 1.0
# Use the larger scale to ensure both minimums are met
scale = max(scale_height, scale_width)
if scale > 1.0:
new_width = int(width * scale)
new_height = int(height * scale)
img = img.resize((new_width, new_height), Image.LANCZOS)
return img
def process_image(input_path, output_dir, min_height=1400, min_width=None, tolerance=10):
"""
Process a single image: crop whitespace and resize.
Args:
input_path: Path to input image
output_dir: Directory to save processed image
min_height: Minimum height in pixels
min_width: Minimum width in pixels (optional)
tolerance: Whitespace detection tolerance
Returns:
Tuple of (success: bool, filename: str, dimensions: tuple, error: str)
"""
try:
# Open image
img = Image.open(input_path)
# Trim whitespace from left and right
img = trim_whitespace(img, tolerance)
# Resize to ensure minimum dimensions
img = resize_to_min_dimensions(img, min_height, min_width)
# Create output path
output_path = Path(output_dir) / Path(input_path).name
# Save processed image
img.save(output_path, quality=95, optimize=True)
return (True, Path(input_path).name, img.size, None)
except Exception as e:
return (False, Path(input_path).name, None, str(e))
def process_directory(input_dir, output_dir, min_height=1400, min_width=None, tolerance=10, workers=None):
"""
Process all images in a directory using parallel processing.
Args:
input_dir: Input directory path
output_dir: Output directory path
min_height: Minimum height in pixels
min_width: Minimum width in pixels (optional)
tolerance: Whitespace detection tolerance
workers: Number of parallel workers (None = auto-detect)
"""
# Create output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Supported image formats
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp'}
# Get all image files
input_path = Path(input_dir)
image_files = [
f for f in input_path.iterdir()
if f.is_file() and f.suffix.lower() in image_extensions
]
if not image_files:
print(f"No image files found in {input_dir}")
return
# Determine number of workers
if workers is None:
workers = max(1, cpu_count() - 1) # Leave one core free
print(f"Found {len(image_files)} images to process")
print(f"Using {workers} parallel workers\n")
# Create partial function with fixed parameters
process_func = partial(
process_image,
output_dir=output_dir,
min_height=min_height,
min_width=min_width,
tolerance=tolerance
)
# Process images in parallel with progress bar
success_count = 0
error_count = 0
with Pool(processes=workers) as pool:
results = list(tqdm(
pool.imap(process_func, image_files),
total=len(image_files),
desc="Processing images",
unit="img"
))
# Print results
print("\n" + "="*60)
for success, filename, dimensions, error in results:
if success:
success_count += 1
print(f"✓ {filename} -> {dimensions[0]}x{dimensions[1]}")
else:
error_count += 1
print(f"✗ {filename}: {error}")
print("="*60)
print(f"\nProcessing complete!")
print(f" Success: {success_count}/{len(image_files)}")
if error_count > 0:
print(f" Errors: {error_count}/{len(image_files)}")
print(f" Output: {output_dir}")
def main():
parser = argparse.ArgumentParser(
description='Crop whitespace and resize images to minimum dimensions (parallel processing)'
)
parser.add_argument(
'input_dir',
help='Input directory containing images'
)
parser.add_argument(
'output_dir',
help='Output directory for processed images'
)
parser.add_argument(
'--min-height',
type=int,
default=1400,
help='Minimum height in pixels (default: 1400)'
)
parser.add_argument(
'--min-width',
type=int,
default=None,
help='Minimum width in pixels (optional)'
)
parser.add_argument(
'--tolerance',
type=int,
default=10,
help='Whitespace detection tolerance 0-255 (default: 10)'
)
parser.add_argument(
'--workers',
type=int,
default=None,
help=f'Number of parallel workers (default: auto = {max(1, cpu_count() - 1)})'
)
args = parser.parse_args()
# Validate input directory
if not Path(args.input_dir).exists():
print(f"Error: Input directory '{args.input_dir}' does not exist")
sys.exit(1)
# Process images
process_directory(
args.input_dir,
args.output_dir,
args.min_height,
args.min_width,
args.tolerance,
args.workers
)
if __name__ == '__main__':
main()
| 1 | #!/usr/bin/env python3 |
| 2 | """ |
| 3 | Fast parallel image processing script to crop whitespace and resize images. |
| 4 | Removes white space from left and right, ensures minimum height and/or width. |
| 5 | Uses multiprocessing for high-speed batch processing of large image collections. |
| 6 | """ |
| 7 | |
| 8 | import os |
| 9 | import sys |
| 10 | from pathlib import Path |
| 11 | from PIL import Image, ImageChops |
| 12 | import argparse |
| 13 | from multiprocessing import Pool, cpu_count |
| 14 | from functools import partial |
| 15 | from tqdm import tqdm |
| 16 | |
| 17 | # Allow processing of very large images |
| 18 | Image.MAX_IMAGE_PIXELS = None |
| 19 | |
| 20 | |
| 21 | def trim_whitespace(img, tolerance=10): |
| 22 | """ |
| 23 | Remove whitespace from left and right sides of image. |
| 24 | |
| 25 | Args: |
| 26 | img: PIL Image object |
| 27 | tolerance: How much variation from white to accept (0-255) |
| 28 | |
| 29 | Returns: |
| 30 | Cropped PIL Image object |
| 31 | """ |
| 32 | # Convert to RGB if needed |
| 33 | if img.mode != 'RGB': |
| 34 | img = img.convert('RGB') |
| 35 | |
| 36 | # Create a background image that's solid white |
| 37 | bg = Image.new('RGB', img.size, (255, 255, 255)) |
| 38 | |
| 39 | # Get difference between image and white background |
| 40 | diff = ImageChops.difference(img, bg) |
| 41 | |
| 42 | # Convert to grayscale and get bounding box |
| 43 | diff = diff.convert('L') |
| 44 | |
| 45 | # Apply tolerance - values below tolerance become 0 (considered white) |
| 46 | bbox = diff.point(lambda x: 0 if x < tolerance else 255).getbbox() |
| 47 | |
| 48 | if bbox: |
| 49 | # Crop to remove whitespace on left and right only |
| 50 | left, top, right, bottom = bbox |
| 51 | # Keep original top and bottom, only crop left and right |
| 52 | return img.crop((left, 0, right, img.height)) |
| 53 | |
| 54 | return img |
| 55 | |
| 56 | |
| 57 | def resize_to_min_dimensions(img, min_height=1400, min_width=None): |
| 58 | """ |
| 59 | Resize image to ensure minimum dimensions while maintaining aspect ratio. |
| 60 | |
| 61 | Args: |
| 62 | img: PIL Image object |
| 63 | min_height: Minimum height in pixels |
| 64 | min_width: Minimum width in pixels (optional) |
| 65 | |
| 66 | Returns: |
| 67 | Resized PIL Image object |
| 68 | """ |
| 69 | width, height = img.size |
| 70 | |
| 71 | # Calculate scale factors needed for each dimension |
| 72 | scale_height = min_height / height if height < min_height else 1.0 |
| 73 | scale_width = min_width / width if min_width and width < min_width else 1.0 |
| 74 | |
| 75 | # Use the larger scale to ensure both minimums are met |
| 76 | scale = max(scale_height, scale_width) |
| 77 | |
| 78 | if scale > 1.0: |
| 79 | new_width = int(width * scale) |
| 80 | new_height = int(height * scale) |
| 81 | img = img.resize((new_width, new_height), Image.LANCZOS) |
| 82 | |
| 83 | return img |
| 84 | |
| 85 | |
| 86 | def process_image(input_path, output_dir, min_height=1400, min_width=None, tolerance=10): |
| 87 | """ |
| 88 | Process a single image: crop whitespace and resize. |
| 89 | |
| 90 | Args: |
| 91 | input_path: Path to input image |
| 92 | output_dir: Directory to save processed image |
| 93 | min_height: Minimum height in pixels |
| 94 | min_width: Minimum width in pixels (optional) |
| 95 | tolerance: Whitespace detection tolerance |
| 96 | |
| 97 | Returns: |
| 98 | Tuple of (success: bool, filename: str, dimensions: tuple, error: str) |
| 99 | """ |
| 100 | try: |
| 101 | # Open image |
| 102 | img = Image.open(input_path) |
| 103 | |
| 104 | # Trim whitespace from left and right |
| 105 | img = trim_whitespace(img, tolerance) |
| 106 | |
| 107 | # Resize to ensure minimum dimensions |
| 108 | img = resize_to_min_dimensions(img, min_height, min_width) |
| 109 | |
| 110 | # Create output path |
| 111 | output_path = Path(output_dir) / Path(input_path).name |
| 112 | |
| 113 | # Save processed image |
| 114 | img.save(output_path, quality=95, optimize=True) |
| 115 | |
| 116 | return (True, Path(input_path).name, img.size, None) |
| 117 | |
| 118 | except Exception as e: |
| 119 | return (False, Path(input_path).name, None, str(e)) |
| 120 | |
| 121 | |
| 122 | def process_directory(input_dir, output_dir, min_height=1400, min_width=None, tolerance=10, workers=None): |
| 123 | """ |
| 124 | Process all images in a directory using parallel processing. |
| 125 | |
| 126 | Args: |
| 127 | input_dir: Input directory path |
| 128 | output_dir: Output directory path |
| 129 | min_height: Minimum height in pixels |
| 130 | min_width: Minimum width in pixels (optional) |
| 131 | tolerance: Whitespace detection tolerance |
| 132 | workers: Number of parallel workers (None = auto-detect) |
| 133 | """ |
| 134 | # Create output directory if it doesn't exist |
| 135 | Path(output_dir).mkdir(parents=True, exist_ok=True) |
| 136 | |
| 137 | # Supported image formats |
| 138 | image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp'} |
| 139 | |
| 140 | # Get all image files |
| 141 | input_path = Path(input_dir) |
| 142 | image_files = [ |
| 143 | f for f in input_path.iterdir() |
| 144 | if f.is_file() and f.suffix.lower() in image_extensions |
| 145 | ] |
| 146 | |
| 147 | if not image_files: |
| 148 | print(f"No image files found in {input_dir}") |
| 149 | return |
| 150 | |
| 151 | # Determine number of workers |
| 152 | if workers is None: |
| 153 | workers = max(1, cpu_count() - 1) # Leave one core free |
| 154 | |
| 155 | print(f"Found {len(image_files)} images to process") |
| 156 | print(f"Using {workers} parallel workers\n") |
| 157 | |
| 158 | # Create partial function with fixed parameters |
| 159 | process_func = partial( |
| 160 | process_image, |
| 161 | output_dir=output_dir, |
| 162 | min_height=min_height, |
| 163 | min_width=min_width, |
| 164 | tolerance=tolerance |
| 165 | ) |
| 166 | |
| 167 | # Process images in parallel with progress bar |
| 168 | success_count = 0 |
| 169 | error_count = 0 |
| 170 | |
| 171 | with Pool(processes=workers) as pool: |
| 172 | results = list(tqdm( |
| 173 | pool.imap(process_func, image_files), |
| 174 | total=len(image_files), |
| 175 | desc="Processing images", |
| 176 | unit="img" |
| 177 | )) |
| 178 | |
| 179 | # Print results |
| 180 | print("\n" + "="*60) |
| 181 | for success, filename, dimensions, error in results: |
| 182 | if success: |
| 183 | success_count += 1 |
| 184 | print(f"✓ {filename} -> {dimensions[0]}x{dimensions[1]}") |
| 185 | else: |
| 186 | error_count += 1 |
| 187 | print(f"✗ {filename}: {error}") |
| 188 | |
| 189 | print("="*60) |
| 190 | print(f"\nProcessing complete!") |
| 191 | print(f" Success: {success_count}/{len(image_files)}") |
| 192 | if error_count > 0: |
| 193 | print(f" Errors: {error_count}/{len(image_files)}") |
| 194 | print(f" Output: {output_dir}") |
| 195 | |
| 196 | |
| 197 | def main(): |
| 198 | parser = argparse.ArgumentParser( |
| 199 | description='Crop whitespace and resize images to minimum dimensions (parallel processing)' |
| 200 | ) |
| 201 | parser.add_argument( |
| 202 | 'input_dir', |
| 203 | help='Input directory containing images' |
| 204 | ) |
| 205 | parser.add_argument( |
| 206 | 'output_dir', |
| 207 | help='Output directory for processed images' |
| 208 | ) |
| 209 | parser.add_argument( |
| 210 | '--min-height', |
| 211 | type=int, |
| 212 | default=1400, |
| 213 | help='Minimum height in pixels (default: 1400)' |
| 214 | ) |
| 215 | parser.add_argument( |
| 216 | '--min-width', |
| 217 | type=int, |
| 218 | default=None, |
| 219 | help='Minimum width in pixels (optional)' |
| 220 | ) |
| 221 | parser.add_argument( |
| 222 | '--tolerance', |
| 223 | type=int, |
| 224 | default=10, |
| 225 | help='Whitespace detection tolerance 0-255 (default: 10)' |
| 226 | ) |
| 227 | parser.add_argument( |
| 228 | '--workers', |
| 229 | type=int, |
| 230 | default=None, |
| 231 | help=f'Number of parallel workers (default: auto = {max(1, cpu_count() - 1)})' |
| 232 | ) |
| 233 | |
| 234 | args = parser.parse_args() |
| 235 | |
| 236 | # Validate input directory |
| 237 | if not Path(args.input_dir).exists(): |
| 238 | print(f"Error: Input directory '{args.input_dir}' does not exist") |
| 239 | sys.exit(1) |
| 240 | |
| 241 | # Process images |
| 242 | process_directory( |
| 243 | args.input_dir, |
| 244 | args.output_dir, |
| 245 | args.min_height, |
| 246 | args.min_width, |
| 247 | args.tolerance, |
| 248 | args.workers |
| 249 | ) |
| 250 | |
| 251 | |
| 252 | if __name__ == '__main__': |
| 253 | main() |
| 254 |