全量构建索引包含两个三方面工作:
- 数据源准备
- 数据并行化索引构建
- 索引替换,合并新增索引
| public class RedisClient { | |
| private JedisPool pool; | |
| @Inject | |
| public RedisClient(Settings settings) { | |
| try { | |
| pool = new JedisPool(new JedisPoolConfig(), settings.get("redis.host"), settings.getAsInt("redis.port", 6379)); | |
| } catch (SettingsException e) { | |
| // ignore |
| <link rel="stylesheet" href="http://yandex.st/highlightjs/6.2/styles/googlecode.min.css"> | |
| <script src="http://code.jquery.com/jquery-1.7.2.min.js"></script> | |
| <script src="http://yandex.st/highlightjs/6.2/highlight.min.js"></script> | |
| <script>hljs.initHighlightingOnLoad();</script> | |
| <script type="text/javascript"> | |
| $(document).ready(function(){ | |
| $("h2,h3,h4,h5,h6").each(function(i,item){ | |
| var tag = $(item).get(0).localName; |
一个 streaming app,从 kafka 某个 topic 获取数据,用的 high level api,最终应该是只有一个 KafkaReceiver 对吧 然后这个 KafkaReceiver 我看代码应该只会分发到一个 executos 去接收数据
是的
即使topic 有多个 partition
是的 >你也可以弄多个Receiver,每个Receiver可以通过设置多线程接收的方式
| @At(path = Array("/upload"), types = Array( RestRequest.Method.POST)) | |
| @BasicInfo( | |
| desc = "可指定哪些服务器下载指定url地址的文件到指定目录", | |
| state = State.alpha, | |
| testParams = "", | |
| testResult = "task submit", | |
| author = "WilliamZhu", | |
| email = "[email protected]" | |
| ) | |
| def upload = { |
| import org.joda.time.DateTime | |
| import org.joda.time.format.DateTimeFormat | |
| import scala.collection.mutable.ArrayBuffer | |
| /** | |
| * Created by allwefantasy on 1/28/16. | |
| */ | |
| object NginxParser { | |
| def parse(line: String) = { |
| import _root_.kafka.serializer.StringDecoder | |
| import com.stuq.nginx.parser.NginxParser | |
| import org.apache.spark.SparkConf | |
| import org.apache.spark.streaming._ | |
| import org.apache.spark.streaming.kafka.KafkaUtils | |
| /** | |
| * 4/25/16 WilliamZhu([email protected]) |
| Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. | |
| at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) | |
| at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) | |
| at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530) | |
| at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189) | |
| at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146) | |
| at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) | |
| at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.s |
| INFO 08-06 16:24:33,484 - main Query [ | |
| CREATE TABLE IF NOT EXISTS CARBON1 | |
| (CUSTOMERNAME STRING, DAY DOUBLE, HITMISS STRING, HOUR DOUBLE, HTTPCODE STRING, ISP STRING, PROVINCE STRING, PTIME DOUBLE, REQUEST LONG, TRAFFIC LONG, USERID STRING) | |
| STORED BY 'ORG.APACHE.CARBONDATA.FORMAT' | |
| ] | |
| INFO 08-06 16:24:34,652 - Parsing command: CREATE TABLE IF NOT EXISTS carbon1 | |
| (customerName STRING, day DOUBLE, hitMiss STRING, hour DOUBLE, httpCode STRING, isp STRING, province STRING, ptime DOUBLE, request LONG, traffic LONG, userId STRING) | |
| STORED BY 'org.apache.carbondata.format' | |
| NoViableAltException(162@[]) | |
| at org.apache.hadoop.hive.ql.parse.HiveParser.type(HiveParser.java:38610) |
| { | |
| "XXX": { | |
| "desc": "XXX", | |
| "strategy": "streaming.core.strategy.SparkStreamingStrategy", | |
| "algorithm": [], | |
| "ref": [], | |
| "compositor": [ | |
| { | |
| "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor", | |
| "params":[ |