Custom serde for windowed records. Why ? http://mail-archives.apache.org/mod_mbox/kafka-users/201701.mbox/%3cCABQKjk+NtexSyt3gB0hA8w0kYqSkwxWkVUpvSXH9h8uJD_SLZA@mail.gmail.com%3e.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_23, processor=KSTREAM-SOURCE-0000000000, topic=abc, partition=23, offset=388592 | |
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) | |
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) | |
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) | |
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_23] exception caught when producing | |
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119) | |
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) | |
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) | |
at org.apache.kafka.streams.ks |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KStream<Key, Value>[] branches = stream.branch((k, v) -> v.getType().equals("abc")); | |
branches[0].to(keySerde, valueSerde, "topic-abc"); | |
branches = branches[1].branch((k, v) -> v.getType().equals("def")); | |
branches[0].to(keySerde, valueSerde, "topic-def"); | |
branches = branches[1].branch((k, v) -> v.getType().equals("ghi")); | |
branches[0].to(keySerde, valueSerde, "topic-ghi"); | |
// .... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Handle records with a timestamp in their Avro value. | |
* Expects a LONG field named "timestamp". | |
* Any problem makes this extractor return the record's internal timestamp. | |
*/ | |
public class InValueTimestampExtractor implements TimestampExtractor { | |
@Override | |
public long extract(ConsumerRecord<Object, Object> record) { | |
if (record != null && record.value() != null) { |
The code below could be invalid, it's being discussed on the Kafka mailing list here: http://mail-archives.apache.org/mod_mbox/kafka-users/201701.mbox/%3cCABQKjkKMwQJKzzvJbFS6jHigmQgJ675aMhsqiRptmasf4kZF4A@mail.gmail.com%3e
Download AWS usage reports, in XML, CSV or parsed CSV. Based on the Python gist A script to query the Amazon Web Services (S3/EC2/etc) usage reports programmatically..
require 'aws_usage'
# Yield rows from report
aws_usage = AwsUsage.new('[email protected]', 'mypassword')
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[user@server yum.repos.d]# yum install maven2 | |
Loaded plugins: fastestmirror, presto | |
Loading mirror speeds from cached hostfile | |
* base: centos.crazyfrogs.org | |
* epel: mirror.bytemark.co.uk | |
* extras: centos.crazyfrogs.org | |
* jpackage-generic: jpackage.netmindz.net | |
* jpackage-generic-updates: jpackage.netmindz.net | |
* rpmforge: apt.sw.be | |
* rpmforge-extras: apt.sw.be |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'sinatra' | |
get '/' do | |
Dir.mktmpdir('screenme') do |dir| | |
file = File.join(dir, Time.now.strftime('%Y%m%d%H%M%S') + '.png') | |
%x(/usr/sbin/screencapture -x #{file}) | |
send_file file | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Riak::Client::CurbBackend.class_eval do | |
private | |
def perform_with_logs(method, uri, headers, expect, data=nil) | |
puts "\e[31m#{method.upcase} #{uri}\e[0m" | |
puts "\t\e[33m#{headers}\e[0m" | |
# puts "\t\e[36m#{expect}\e[0m" | |
puts "\t\e[34m#{data}\e[0m" if data | |
# puts "\t\e[37m#{caller.last}\e[0m" | |
puts | |
perform_without_logs(method, uri, headers, expect, data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Following http://gist.github.com/472547 | |
# If links are already in the 'b2' objects, then getting the top 5 can be done in the first map function | |
results = Riak::MapReduce.new(client). | |
add('b1', 'a'). | |
link({:bucket => 'b2', :tag => '_', :keep => false}). | |
# This first map only emits 5 items instead of all of them | |
map("function(v, keydata, arg) { return v.not_found ? [] : JSON.parse(v.values[0].data).links.map(function(x) { return [arg, x]; }).slice(0, 5); }", :keep => false, :arg => 'b3'). | |
# The following map did not get the 5 items in the correct order ... | |
map("function(v) { item = JSON.parse(v.values[0].data); return [item]; }", :keep => false). |
NewerOlder