Last active
January 8, 2024 12:58
-
-
Save javrasya/513f838a8af355b51506ca2a2dc1e3d8 to your computer and use it in GitHub Desktop.
Custom S3FileIO which keeps re-opens a new S3 client and use it if the original one is closed. The bug it covers happens due to messed up lifecycle issue. This works with Iceberg version 1.4 not lower.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.iceberg.aws.AwsClientFactories; | |
import org.apache.iceberg.aws.s3.S3FileIO; | |
import org.apache.iceberg.aws.s3.S3FileIOProperties; | |
import org.apache.iceberg.util.SerializableSupplier; | |
import software.amazon.awssdk.services.s3.S3Client; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
public class CustomS3FileIO extends S3FileIO { | |
private final AtomicBoolean closed = new AtomicBoolean(false); | |
private final SerializableSupplier<S3Client> s3; | |
private transient volatile S3Client backupClient; | |
public CustomS3FileIO() { | |
this.s3 = AwsClientFactories.defaultFactory()::s3; | |
} | |
public CustomS3FileIO(SerializableSupplier<S3Client> s3) { | |
super(s3); | |
this.s3 = s3; | |
} | |
public CustomS3FileIO(SerializableSupplier<S3Client> s3, S3FileIOProperties s3FileIOProperties) { | |
super(s3, s3FileIOProperties); | |
this.s3 = s3; | |
} | |
@Override | |
public S3Client client() { | |
if (closed.compareAndSet(true, false)) { | |
synchronized (this) { | |
if (backupClient == null) { | |
this.backupClient = this.s3.get(); | |
} | |
} | |
} | |
if (null != backupClient) { | |
return backupClient; | |
} else { | |
return super.client(); | |
} | |
} | |
@Override | |
public void close() { | |
synchronized (closed) { | |
super.close(); | |
if (closed.compareAndSet(false, true)) { | |
if (backupClient != null) { | |
backupClient.close(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment