Skip to content

Instantly share code, notes, and snippets.

@maiha
Last active January 20, 2017 05:00
Show Gist options
  • Save maiha/ed710bd4b9d53a2be249 to your computer and use it in GitHub Desktop.
Save maiha/ed710bd4b9d53a2be249 to your computer and use it in GitHub Desktop.
migrate kafka from 0.8 to 0.9

"__consumer_offsets"

-kafka.server.OffsetManager.OffsetsTopicName
+kafka.coordinator.GroupCoordinator.GroupMetadataTopicName

not found: value ConsumerMetadataRequest

  • renamed to GroupCoordinatorRequest

not found: value ConsumerMetadataResponse

  • renamed to GroupCoordinatorResponse

org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /config/topics

  • use JaasUtils.isZkSecurityEnabled() to know zk security flag
-  ZkUtils(zkClient, isZkSecurityEnabled = true)
+  import org.apache.kafka.common.security.JaasUtils
+  ZkUtils(zkClient, isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled())

value host is not a member of kafka.cluster.Broker

  • Broker -> host now becomes Broker -> BrokerEndPoint(s) -> host
  • NOTE: properly set SecurityProtocol like PLAINTEXT
-  b.host
+  import org.apache.kafka.common.protocol.SecurityProtocol
+  b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D

  • specify ZKStringSerializer by using ZkUtils.createZkClient
-  new ZkClient(s, 30000, 30000)
+  ZkUtils.createZkClient(s, 30000, 30000)

not found: value OffsetManager

-kafka.server.OffsetManager.OffsetsTopicName
+kafka.coordinator.GroupCoordinator.GroupMetadataTopicName

Receive

  • buffer was been renamed to payload
- val offsetResponse = OffsetResponse.readFrom(leaderChannel.receive().buffer)
+ val offsetResponse = OffsetResponse.readFrom(leaderChannel.receive().payload())

Invalid value 20 seconds for configuration timeout.ms: Not a number of type INT

  • 0.8: accepts "20 seconds" String
  • 0.9: rejects it
- timeout.toMillis.toString
+ timeout.totoString

Utils

-import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils
Utils.readBytes(...)

ZkClient

  • use createZkClient to set default serializer
-  new ZkClient(s, 30000, 30000)
+  ZkUtils.createZkClient(s, 30000, 30000)

ZkUtils

  • many admin methods now expects ZkUtils rather than ZkClient
val zk = ZkUtils(zkClient, isZkSecurityEnabled = false)

ZKStringSerializer

  • become private object
  • use ZkUtils.createZkClient to use it
- new ZkClient(zkConfig.servers, zkConfig.sessionTimeout, zkConfig.connectionTimeout, ZKStringSerializer)
+ ZkUtils.createZkClient(zkConfig.servers, zkConfig.sessionTimeout, zkConfig.connectionTimeout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment