| """ |
| https://github.com/Trinkle23897/Fast-Poisson-Image-Editing |
| MIT License |
| |
| Copyright (c) 2022 Jiayi Weng |
| |
| Permission is hereby granted, free of charge, to any person obtaining a copy |
| of this software and associated documentation files (the "Software"), to deal |
| in the Software without restriction, including without limitation the rights |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| copies of the Software, and to permit persons to whom the Software is |
| furnished to do so, subject to the following conditions: |
| |
| The above copyright notice and this permission notice shall be included in all |
| copies or substantial portions of the Software. |
| |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| SOFTWARE. |
| """ |
|
|
| import time |
| import argparse |
| import os |
| import fpie |
| from process import ALL_BACKEND, CPU_COUNT, DEFAULT_BACKEND |
| from fpie.io import read_images, write_image |
| from process import BaseProcessor, EquProcessor, GridProcessor |
|
|
| from PIL import Image |
| import numpy as np |
| import skimage |
| import skimage.measure |
| import scipy |
| import scipy.signal |
|
|
|
|
| class PhotometricCorrection: |
| def __init__(self,quite=False): |
| self.get_parser("cli") |
| args=self.parser.parse_args(["--method","grid","-g","src","-s","a","-t","a","-o","a"]) |
| args.mpi_sync_interval = getattr(args, "mpi_sync_interval", 0) |
| self.backend=args.backend |
| self.args=args |
| self.quite=quite |
| proc: BaseProcessor |
| proc = GridProcessor( |
| args.gradient, |
| args.backend, |
| args.cpu, |
| args.mpi_sync_interval, |
| args.block_size, |
| args.grid_x, |
| args.grid_y, |
| ) |
| print( |
| f"[PIE]Successfully initialize PIE {args.method} solver " |
| f"with {args.backend} backend" |
| ) |
| self.proc=proc |
|
|
| def run(self, original_image, inpainted_image, mode="mask_mode"): |
| print(f"[PIE] start") |
| if mode=="disabled": |
| return inpainted_image |
| input_arr=np.array(original_image) |
| if input_arr[:,:,-1].sum()<1: |
| return inpainted_image |
| output_arr=np.array(inpainted_image) |
| mask=input_arr[:,:,-1] |
| mask=255-mask |
| if mask.sum()<1 and mode=="mask_mode": |
| mode="" |
| if mode=="mask_mode": |
| mask = skimage.measure.block_reduce(mask, (8, 8), np.max) |
| mask = mask.repeat(8, axis=0).repeat(8, axis=1) |
| else: |
| mask[8:-9,8:-9]=255 |
| mask = mask[:,:,np.newaxis].repeat(3,axis=2) |
| nmask=mask.copy() |
| output_arr2=output_arr[:,:,0:3].copy() |
| input_arr2=input_arr[:,:,0:3].copy() |
| output_arr2[nmask<128]=0 |
| input_arr2[nmask>=128]=0 |
| output_arr2+=input_arr2 |
| src = output_arr2[:,:,0:3] |
| tgt = src.copy() |
| proc=self.proc |
| args=self.args |
| if proc.root: |
| n = proc.reset(src, mask, tgt, (args.h0, args.w0), (args.h1, args.w1)) |
| proc.sync() |
| if proc.root: |
| result = tgt |
| t = time.time() |
| if args.p == 0: |
| args.p = args.n |
|
|
| for i in range(0, args.n, args.p): |
| if proc.root: |
| result, err = proc.step(args.p) |
| print(f"[PIE] Iter {i + args.p}, abs_err {err}") |
| else: |
| proc.step(args.p) |
|
|
| if proc.root: |
| dt = time.time() - t |
| print(f"[PIE] Time elapsed: {dt:.4f}s") |
| |
| return Image.fromarray(result) |
|
|
|
|
| def get_parser(self,gen_type: str) -> argparse.Namespace: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "-v", "--version", action="store_true", help="show the version and exit" |
| ) |
| parser.add_argument( |
| "--check-backend", action="store_true", help="print all available backends" |
| ) |
| if gen_type == "gui" and "mpi" in ALL_BACKEND: |
| |
| ALL_BACKEND.remove("mpi") |
| parser.add_argument( |
| "-b", |
| "--backend", |
| type=str, |
| choices=ALL_BACKEND, |
| default=DEFAULT_BACKEND, |
| help="backend choice", |
| ) |
| parser.add_argument( |
| "-c", |
| "--cpu", |
| type=int, |
| default=CPU_COUNT, |
| help="number of CPU used", |
| ) |
| parser.add_argument( |
| "-z", |
| "--block-size", |
| type=int, |
| default=1024, |
| help="cuda block size (only for equ solver)", |
| ) |
| parser.add_argument( |
| "--method", |
| type=str, |
| choices=["equ", "grid"], |
| default="equ", |
| help="how to parallelize computation", |
| ) |
| parser.add_argument("-s", "--source", type=str, help="source image filename") |
| if gen_type == "cli": |
| parser.add_argument( |
| "-m", |
| "--mask", |
| type=str, |
| help="mask image filename (default is to use the whole source image)", |
| default="", |
| ) |
| parser.add_argument("-t", "--target", type=str, help="target image filename") |
| parser.add_argument("-o", "--output", type=str, help="output image filename") |
| if gen_type == "cli": |
| parser.add_argument( |
| "-h0", type=int, help="mask position (height) on source image", default=0 |
| ) |
| parser.add_argument( |
| "-w0", type=int, help="mask position (width) on source image", default=0 |
| ) |
| parser.add_argument( |
| "-h1", type=int, help="mask position (height) on target image", default=0 |
| ) |
| parser.add_argument( |
| "-w1", type=int, help="mask position (width) on target image", default=0 |
| ) |
| parser.add_argument( |
| "-g", |
| "--gradient", |
| type=str, |
| choices=["max", "src", "avg"], |
| default="max", |
| help="how to calculate gradient for PIE", |
| ) |
| parser.add_argument( |
| "-n", |
| type=int, |
| help="how many iteration would you perfer, the more the better", |
| default=5000, |
| ) |
| if gen_type == "cli": |
| parser.add_argument( |
| "-p", type=int, help="output result every P iteration", default=0 |
| ) |
| if "mpi" in ALL_BACKEND: |
| parser.add_argument( |
| "--mpi-sync-interval", |
| type=int, |
| help="MPI sync iteration interval", |
| default=100, |
| ) |
| parser.add_argument( |
| "--grid-x", type=int, help="x axis stride for grid solver", default=8 |
| ) |
| parser.add_argument( |
| "--grid-y", type=int, help="y axis stride for grid solver", default=8 |
| ) |
| self.parser=parser |
|
|
| if __name__ =="__main__": |
| import sys |
| import io |
| import base64 |
| from PIL import Image |
| def base64_to_pil(base64_str): |
| data = base64.b64decode(str(base64_str)) |
| pil = Image.open(io.BytesIO(data)) |
| return pil |
|
|
| def pil_to_base64(out_pil): |
| out_buffer = io.BytesIO() |
| out_pil.save(out_buffer, format="PNG") |
| out_buffer.seek(0) |
| base64_bytes = base64.b64encode(out_buffer.read()) |
| base64_str = base64_bytes.decode("ascii") |
| return base64_str |
| correction_func=PhotometricCorrection(quite=True) |
| while True: |
| buffer = sys.stdin.readline() |
| print(f"[PIE] suprocess {len(buffer)} {type(buffer)} ") |
| if len(buffer)==0: |
| break |
| if isinstance(buffer,str): |
| lst=buffer.strip().split(",") |
| else: |
| lst=buffer.decode("ascii").strip().split(",") |
| img0=base64_to_pil(lst[0]) |
| img1=base64_to_pil(lst[1]) |
| ret=correction_func.run(img0,img1,mode=lst[2]) |
| ret_base64=pil_to_base64(ret) |
| if isinstance(buffer,str): |
| sys.stdout.write(f"{ret_base64}\n") |
| else: |
| sys.stdout.write(f"{ret_base64}\n".encode()) |
| sys.stdout.flush() |