Skip to content

Instantly share code, notes, and snippets.

View brachi-wernick's full-sized avatar

Brachi Packter brachi-wernick

  • moonactive
  • Israel
View GitHub Profile
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Created January 10, 2020 11:01
FieldsConfigurationProvider
public class FieldsConfigurationProvider{
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
private Long lastUpdateTime;
private FieldsConfiguration config;
private String bucketName;
private String filePath;
private long fieldsConfigLoadInterval;
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Last active January 10, 2020 11:06
FieldsConfigurationProvider
private void watch() {
new Thread(() -> {
Storage storage = StorageOptions.getDefaultInstance().getService();
Bucket bucket = storage.get(bucketName);
while(true) {
try {
Blob blob = bucket.get(filePath);
long currentTime = blob.getUpdateTime();
@brachi-wernick
brachi-wernick / DoFn.java
Created January 10, 2020 11:18
Using field config in the DOFN
@ProcessElement
public void processElement(@Element JsonNode jsonNode, ProcessContext context, PipelineOptions options) {
FieldConfigurationOptions fieldConfigurationOptions = options.as(FieldConfigurationOptions.class);
FieldConfigurationLoader fieldsConfigurationProvider = fieldConfigurationOptions.getFieldsConfigurationProvider();
FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
TableRow tableRow = new TableRow();
@brachi-wernick
brachi-wernick / MyFn.java
Last active February 7, 2020 11:58
MyFn
public class MyFn extends DoFn<String, String> {
private static RedissonClient redisClient;
private final static Object lock = new Object();
private String redisHost;
private int redisPort;
MyFn(String redisHost, Integer redisPort) {
this.redisHost = redisHost;
public interface RedisOptions extends PipelineOptions {
ValueProvider<String> getRedisHost();
void setRedisHost(ValueProvider<String> value);
ValueProvider<Integer> getRedisPort();
void setRedisPort(ValueProvider<Integer> value);
@brachi-wernick
brachi-wernick / RedisClientFactory.java
Last active February 7, 2020 11:54
RedisClientFactory
public class RedisClientFactory implements DefaultValueFactory<RedissonClient> {
@Override
public RedissonClient create(PipelineOptions options) {
RedisOptions redisOptions = options.as(RedisOptions.class);
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisOptions.getRedisHost().get() + ":" + redisOptions.getRedisPort().get()");
@brachi-wernick
brachi-wernick / MyFn.java
Last active February 7, 2020 11:58
MyFn with using factory option
public class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext context,PipelineOptions options) {
RedisOptions redisOptions = options.as(RedisOptions.class);
RedissonClient redissonClient = redisOptions.getRedissonClient();
//some logic enrichment
redis.get()
@brachi-wernick
brachi-wernick / MainVerticle.java
Created February 24, 2020 20:59
Vertx hello world
public class MainVerticle extends AbstractVerticle
{
@Override
public void start(Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);
router.get("/hello").handler(this::hello);
@brachi-wernick
brachi-wernick / Dockerfile
Last active February 24, 2020 21:11
docker vert.x
FROM java:8-jre
ENV VERTICLE_FILE starter-1.0.0-SNAPSHOT-fat.jar
# Set the location of the verticles
ENV VERTICLE_HOME /usr/verticles
EXPOSE 8882
# Copy your fat jar to the container
COPY target/$VERTICLE_FILE $VERTICLE_HOME/
# Launch the verticle
WORKDIR $VERTICLE_HOME
ENTRYPOINT ["sh", "-c"]
public class RedisClient {
private static RedissonClient redisClient;
public static synchronized void newInstance(){
if(redisClient==null) {
Config config = new Config();
config.setCodec(new StringCodec());
config.setThreads(Integer.parseInt(System.getenv("threads")));