This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FieldsConfigurationProvider{ | |
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class); | |
private Long lastUpdateTime; | |
private FieldsConfiguration config; | |
private String bucketName; | |
private String filePath; | |
private long fieldsConfigLoadInterval; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void watch() { | |
new Thread(() -> { | |
Storage storage = StorageOptions.getDefaultInstance().getService(); | |
Bucket bucket = storage.get(bucketName); | |
while(true) { | |
try { | |
Blob blob = bucket.get(filePath); | |
long currentTime = blob.getUpdateTime(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@ProcessElement | |
public void processElement(@Element JsonNode jsonNode, ProcessContext context, PipelineOptions options) { | |
FieldConfigurationOptions fieldConfigurationOptions = options.as(FieldConfigurationOptions.class); | |
FieldConfigurationLoader fieldsConfigurationProvider = fieldConfigurationOptions.getFieldsConfigurationProvider(); | |
FieldsConfiguration config = fieldsConfigurationProvider.getConfig(); | |
TableRow tableRow = new TableRow(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MyFn extends DoFn<String, String> { | |
private static RedissonClient redisClient; | |
private final static Object lock = new Object(); | |
private String redisHost; | |
private int redisPort; | |
MyFn(String redisHost, Integer redisPort) { | |
this.redisHost = redisHost; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface RedisOptions extends PipelineOptions { | |
ValueProvider<String> getRedisHost(); | |
void setRedisHost(ValueProvider<String> value); | |
ValueProvider<Integer> getRedisPort(); | |
void setRedisPort(ValueProvider<Integer> value); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RedisClientFactory implements DefaultValueFactory<RedissonClient> { | |
@Override | |
public RedissonClient create(PipelineOptions options) { | |
RedisOptions redisOptions = options.as(RedisOptions.class); | |
Config config = new Config(); | |
config.useSingleServer() | |
.setAddress("redis://" + redisOptions.getRedisHost().get() + ":" + redisOptions.getRedisPort().get()"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MyFn extends DoFn<String, String> { | |
@ProcessElement | |
public void processElement(ProcessContext context,PipelineOptions options) { | |
RedisOptions redisOptions = options.as(RedisOptions.class); | |
RedissonClient redissonClient = redisOptions.getRedissonClient(); | |
//some logic enrichment | |
redis.get() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MainVerticle extends AbstractVerticle | |
{ | |
@Override | |
public void start(Promise<Void> startPromise) throws Exception { | |
Router router = Router.router(vertx); | |
router.get("/hello").handler(this::hello); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM java:8-jre | |
ENV VERTICLE_FILE starter-1.0.0-SNAPSHOT-fat.jar | |
# Set the location of the verticles | |
ENV VERTICLE_HOME /usr/verticles | |
EXPOSE 8882 | |
# Copy your fat jar to the container | |
COPY target/$VERTICLE_FILE $VERTICLE_HOME/ | |
# Launch the verticle | |
WORKDIR $VERTICLE_HOME | |
ENTRYPOINT ["sh", "-c"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RedisClient { | |
private static RedissonClient redisClient; | |
public static synchronized void newInstance(){ | |
if(redisClient==null) { | |
Config config = new Config(); | |
config.setCodec(new StringCodec()); | |
config.setThreads(Integer.parseInt(System.getenv("threads"))); |