1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
public static List<Map<String, Object>> getHistogramSubCountList(String index, String type, String histogram, String groupCol, Map<String, Object> match, int size, long startTime, long endTime){ List<Map<String, Object>> listMap = new LinkedList<>(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SearchRequestBuilder searchRequest = client.prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequest.setTypes(type.split(",")); } BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must( QueryBuilders.rangeQuery("@timestamp") .from(startTime) .to(endTime) .includeLower(true) .includeUpper(true)); if (match != null) { for (Map.Entry<String, Object> entry : match.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); if (value != null && !value.equals("")) boolQueryBuilder.must(QueryBuilders.termsQuery(key, value)); } }
DateHistogramBuilder db = AggregationBuilders.dateHistogram("ts").field("processTime").interval(new DateHistogramInterval(histogram));
String[] groupCols = groupCol.split(","); AggregationBuilder tb = null; AggregationBuilder stb = null; for (int i = 0; i < groupCols.length; i++) { if (tb == null) { tb = AggregationBuilders.terms(i + "").field(groupCols[i]).size(size); stb = tb; } else{ AggregationBuilder ntb = AggregationBuilders.terms(i + "").field(groupCols[i]).size(size); stb.subAggregation(ntb); stb = ntb; } } db.subAggregation(tb);
searchRequest.setQuery(boolQueryBuilder) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation(db); SearchResponse searchResponse = searchRequest.execute().actionGet(); LOG.debug("searchRequest = " + searchRequest); LOG.debug("searchResponse = " + searchResponse); InternalHistogram ts = searchResponse.getAggregations().get("ts"); List<InternalHistogram.Bucket> buckets = ts.getBuckets(); for (InternalHistogram.Bucket bt : buckets) { String processTime = bt.getKeyAsString(); Terms terms = bt.getAggregations().get("0"); for (Bucket bucket : terms.getBuckets()) { String srcAddress = (String) bucket.getKey(); if(groupCols.length == 4) { Terms terms1 = bucket.getAggregations().get("1"); for (Bucket bucket1 : terms1.getBuckets()) { Terms terms2 = bucket1.getAggregations().get("2"); for (Bucket bucket2 : terms2.getBuckets()) { Terms terms3 = bucket2.getAggregations().get("3"); for (Bucket bucket3 : terms3.getBuckets()) { Long docCount = bucket3.getDocCount(); Map<String, Object> map = new HashMap<>(); map.put("processTime", processTime); map.put(groupCols[0], bucket.getKey()); map.put(groupCols[1], bucket1.getKey()); map.put(groupCols[2], bucket2.getKey()); map.put(groupCols[3], bucket3.getKey()); map.put("docCount", docCount.intValue()); LOG.debug(map.toString()); listMap.add(map); } }
} } else if(groupCols.length == 3) { Terms terms1 = bucket.getAggregations().get("1"); for (Bucket bucket1 : terms1.getBuckets()) { Terms terms2 = bucket1.getAggregations().get("2"); for (Bucket bucket2 : terms2.getBuckets()) { Long docCount = bucket2.getDocCount(); Map<String, Object> map = new HashMap<>(); map.put("processTime", processTime); map.put(groupCols[0], bucket.getKey()); map.put(groupCols[1], bucket1.getKey()); map.put(groupCols[2], bucket2.getKey()); map.put("docCount", docCount.intValue()); LOG.debug(map.toString()); listMap.add(map); }
} } else if (groupCols.length == 2) { Terms terms1 = bucket.getAggregations().get("1"); for (Bucket bucket1 : terms1.getBuckets()) { Long docCount = bucket1.getDocCount(); Map<String, Object> map = new HashMap<>(); map.put("processTime", processTime); map.put(groupCols[0], bucket.getKey()); map.put(groupCols[1], bucket1.getKey()); map.put("docCount", docCount.intValue()); LOG.debug(map.toString()); listMap.add(map); } } else { Long docCount = bucket.getDocCount(); Map<String, Object> map = new HashMap<>(); map.put("processTime", processTime); map.put(groupCols[0], bucket.getKey()); map.put("docCount", docCount.intValue()); listMap.add(map); } } } return listMap; }
|