Wikimedia To Opensearch

概览

  • Wikimedia ⇒ Kafka ⇒ Opensearch
  • Java Library:OKhttp3和OkHttp EventSource;
  • 生产者:Wikimedia:WikimediaChangeHandler和WikimediaChangeProducer;
  • 消费者:Opensearch:OpenSearchConsumer,opensearch-java + httpclient5;
  • https://stream.wikimedia.org/v2/stream/recentchange
  • https://esjewett.github.io/wm-eventsource-demo
  • https://codepen.io/Krinkle/pen/BwEKgW?editors=1010
  • Rest Api使用OpenSearch Dashboard,在线可使用Bonsai.io;

Kafka环境

version: '3.8'
services:kafka:image: apache/kafka:3.7.0container_name: kafkaprivileged: truehostname: kafkaports:- "9092:9092"environment:KAFKA_NODE_ID: 1KAFKA_LOG_DIRS: '/tmp/kafka-log'CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'KAFKA_PROCESS_ROLES: 'broker,controller'KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:default:name: network-commonexternal: true
  • vi /opt/kafka/config/kraft/server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092

Opensearch

Open Search Prerequisite

# disable memory paging and swapping performance
sudo swapoff -a# edit sysctl config
sudo vi /etc/sysctl.conf# add line to define desired value or change exist
vm.max_map_count=262144# reload kernel parameter using sysctl
sudo sysctl -p# verify change
cat /proc/sys/vm/max_map_count

Open Search Compose

 docker pull opensearchproject/opensearch:1.3.16 && \docker pull opensearchproject/opensearch-dashboards:1.3.16
version: '3.8'
services:opensearch:image: opensearchproject/opensearch:1.3.16container_name: opensearchenvironment:discovery.type: single-nodeplugins.security.disabled: truecompatibility.override_main_response_version: trueports:- "9200:9200"- "9600:9600"opensearch-dashboard:image: opensearchproject/opensearch-dashboards:1.3.16container_name: opensearch-dashboardports:- "5601:5601"environment:OPENSEARCH_HOSTS: '["http://opensearch:9200"]'DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
  • http://192.168.0.123:5601
  • https://192.168.0.123:9200

Producer

Producer Dependency

<properties><okhttp.eventsource>2.7.1</okhttp.eventsource>
</properties><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId>
</dependency><dependency><groupId>com.launchdarkly</groupId><artifactId>okhttp-eventsource</artifactId><version>${okhttp.eventsource}</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

WikimediaChangeHandler

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import java.lang.invoke.MethodHandles;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class WikimediaChangeHandler implements EventHandler {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());String topic;KafkaProducer<String, String> kafkaProducer;public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer,String topic) {this.kafkaProducer = kafkaProducer;this.topic = topic;}@Overridepublic void onOpen() {}@Overridepublic void onClosed() {kafkaProducer.close();}@Overridepublic void onMessage(String event, MessageEvent messageEvent) {logger.error(messageEvent.getData());kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));}@Overridepublic void onComment(String comment) {}@Overridepublic void onError(Throwable t) {logger.error("Stream Reading Failure!", t);}
}

WikimediaChangeProducer

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.EventHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class WikimediaChangeProducer {public static void main(String[] args) throws InterruptedException {String bootstrapServers = "192.168.0.123:9092";// create Producer propertiesProperties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);String topic = "wikimedia.recentchange";EventHandler eventHandler = new WikimediaChangeHandler(kafkaProducer, topic);String url = "https://stream.wikimedia.org/v2/stream/recentchange";EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));builder.connectTimeout(Duration.ofMinutes(10));//注:需科学上网builder.proxy("127.0.0.1",1080);EventSource eventSource = builder.build();// start the producer in another threadeventSource.start();// we produce for 10 minutes and block the program until thenTimeUnit.MINUTES.sleep(10);}}

Consumer

Consumer Dependency

<properties><opensearch.java>2.10.1</opensearch.java>
</properties><dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>${opensearch.java}</version>
</dependency><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

OpenSearchConsumer

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class OpenSearchConsumer {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static OpenSearchClient connect(String scheme,String hostName,int port) {final HttpHost host = new HttpHost(scheme,hostName,port);final ApacheHttpClient5TransportBuilder builder =ApacheHttpClient5TransportBuilder.builder(host);builder.setHttpClientConfigCallback(hcb -> {final PoolingAsyncClientConnectionManager manager =PoolingAsyncClientConnectionManagerBuilder.create().build();return hcb.setConnectionManager(manager);});final OpenSearchTransport transport = builder.build();return new OpenSearchClient(transport);}public static OpenSearchClient connect() {return connect("http","192.168.0.123",9200);}public static boolean exist(OpenSearchClient client,String indexName)throws OpenSearchException, IOException {var existRequest = ExistsRequest.of(fn -> fn.index(indexName));BooleanResponse exist = client.indices().exists(existRequest);return exist.value();}public static void createIndex(OpenSearchClient client,String indexName) throws OpenSearchException, IOException {var exist = exist(client,indexName);if (exist) {System.out.printf("index %s already exist!\n",indexName);} else {var createRequest = new CreateIndexRequest.Builder().index(indexName).build();client.indices().create(createRequest);}}//GET /_cat/indices?vpublic static void deleteIndex(OpenSearchClient client,String indexName) throws OpenSearchException, IOException {var exist = exist(client,indexName);if (!exist) {System.out.printf("index %s not exist!\n",indexName);} else {var deleteRequest = new DeleteIndexRequest.Builder().index(indexName).build();client.indices().delete(deleteRequest);}}public static KafkaConsumer<String,String> createKafkaConsumer(){var boostrapServer = "192.168.0.123:9092";var groupId = "group-wikimedia-opensearch";Properties prop = new Properties();prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,boostrapServer);prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);//earliest,latest etc.prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");return new KafkaConsumer<>(prop);}public static void consume(OpenSearchClient client,String indexName)throws OpenSearchException, IOException {var consumer = createKafkaConsumer();var topic = "wikimedia.recentchange";consumer.subscribe(Arrays.asList(topic));while (true) {var consumerRecord = consumer.poll(Duration.ofMillis(3000));int recordCount = consumerRecord.count();logger.info("receive %d record!",recordCount);for (ConsumerRecord<String,String> cr : consumerRecord) {//send record into OpenSearchIndexData indexData = new IndexData(cr.value());var indexRequest = new IndexRequest.Builder<IndexData>().index(indexName).document(indexData).build();IndexResponse response = client.index(indexRequest);System.out.println(response.id());}}}public static void main(String[] sa) throws OpenSearchException, IOException {var client = connect();var indexName = "wikimedia-opensearch";//createIndex(client, indexName);System.out.println("consuming start...");consume(client,indexName);System.out.println("consuming end...");}static class IndexData {private String wikiMediaValue;public IndexData(String wikiMediaValue) {this.wikiMediaValue = wikiMediaValue;}@Overridepublic String toString() {return String.format("IndexData{wikiMediaValue='%s'}",wikiMediaValue);}public String getWikiMediaValue() {return wikiMediaValue;}public void setWikiMediaValue(String wikiMediaValue) {this.wikiMediaValue = wikiMediaValue;}}}

Testing

Create Topic

  • 创建主题,并启动WikimediaChangeProducer;
  • 注:需科学上网,需科学上网,需科学上网;
./kafka-topics.sh \--bootstrap-server 192.168.0.123:9092  \--topic wikimedia.recentchange --create \--partitions 3 --replication-factor 1
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"93bf8bf8-4c36-4a49-b226-3a9311d2c906","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257037},"id":484017699,"type":"log","namespace":2,"title":"Участник:Sherbek Qarshiyev","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062090,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Sherbek Qarshiyev (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q46898283","request_id":"161daa3c-9f11-44f1-b042-209a3acabcf8","id":"65fb4ae3-86fd-47ab-a85f-f0dd26b04b66","dt":"2024-04-25T12:52:32Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257038},"id":2201214347,"type":"edit","namespace":0,"title":"Q46898283","title_url":"https://www.wikidata.org/wiki/Q46898283","comment":"/* wbsetclaimvalue:1| */ [[Property:P1476]]: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults","timestamp":1714049552,"user":"KrBot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2136975570&oldid=2136975561&rcid=2201214347","minor":false,"patrolled":true,"length":{"old":60872,"new":60869},"revision":{"old":2136975561,"new":2136975570},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Определено значение для утверждения: </span></span> <a href=\"/wiki/Property:P1476\" title=\"название | название произведения (книги, фильма, газетной статьи, произведения исполнительского искусства, веб-сайта)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"ru\" dir=\"ltr\">название</span> <span class=\"wb-itemlink-id\">(P1476)</span></span></a>: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","request_id":"4c136271-a6cd-4ff6-a6b7-429d769ba5ba","id":"14fcefb7-1f73-40a9-9acc-539d97aa06c3","dt":"2024-04-25T12:52:32Z","domain":"en.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257039},"id":1769512861,"type":"edit","namespace":2,"title":"User:Ali Ahwazi/sandbox2","title_url":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","comment":"","timestamp":1714049552,"user":"Ali Ahwazi","bot":false,"notify_url":"https://en.wikipedia.org/w/index.php?diff=1220709553&oldid=1220709505","minor":false,"length":{"old":26671,"new":31548},"revision":{"old":1220709505,"new":1220709553},"server_url":"https://en.wikipedia.org","server_name":"en.wikipedia.org","server_script_path":"/w","wiki":"enwiki","parsedcomment":""}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","request_id":"be816a4f-be27-4c49-830a-31161665401f","id":"ec47eca3-b6a4-4b4f-ac71-b44369de940d","dt":"2024-04-25T12:52:33Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257040},"id":519472837,"type":"edit","namespace":100,"title":"Portail:Châteaux/Articles récents","title_url":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","comment":"+ [[Château de Mielmont]]","timestamp":1714049553,"user":"OrlodrimBot","bot":true,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561930&oldid=214559720&rcid=519472837","minor":false,"patrolled":true,"length":{"old":779,"new":779},"revision":{"old":214559720,"new":214561930},"server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"+ <a href=\"/wiki/Ch%C3%A2teau_de_Mielmont\" title=\"Château de Mielmont\">Château de Mielmont</a>"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"b8ab3286-f69f-466a-9a00-c5c12c176001","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257041},"id":484017700,"type":"log","namespace":2,"title":"Участник:Виктория Никитенко","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062091,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Виктория Никитенко (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}20:52:31.937 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","request_id":"739f7d34-fd23-4b37-ab77-87c95663aeda","id":"34ab12f5-7256-48e5-a4f8-b40bf95316ad","dt":"2024-04-25T12:52:33Z","domain":"oc.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257042},"id":10815379,"type":"new","namespace":0,"title":"Pairac lo Chasteu","title_url":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","comment":"Redireccion cap a [[Pairac (lo Chasteu)]]","timestamp":1714049553,"user":"PairacLoChasteu","bot":false,"notify_url":"https://oc.wikipedia.org/w/index.php?oldid=2436522&rcid=10815379","minor":false,"patrolled":false,"length":{"new":32},"revision":{"new":2436522},"server_url":"https://oc.wikipedia.org","server_name":"oc.wikipedia.org","server_script_path":"/w","wiki":"ocwiki","parsedcomment":"Redireccion cap a <a href=\"/wiki/Pairac_(lo_Chasteu)\" title=\"Pairac (lo Chasteu)\">Pairac (lo Chasteu)</a>"}
20:52:31.938 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","request_id":"7d6530d5-10a3-4628-a101-bc2e75b9a92f","id":"2eb4865e-b4ee-407c-9e19-b871967ed9e1","dt":"2024-04-25T12:52:30Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257043},"id":519472838,"type":"categorize","namespace":14,"title":"Catégorie:Article à référence nécessaire","title_url":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","comment":"[[:20e armée (Union soviétique)]] ajoutée à la catégorie","timestamp":1714049550,"user":"Le Petit Chat","bot":false,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561929&oldid=209346180&rcid=519472838","server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"<a href=\"/wiki/20e_arm%C3%A9e_(Union_sovi%C3%A9tique)\" title=\"20e armée (Union soviétique)\">20e armée (Union soviétique)</a> ajoutée à la catégorie"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","request_id":"3d4237d1-1994-4c65-b41a-84ee1f1a05c6","id":"d9e3f133-2e22-4022-9449-65a78ed83452","dt":"2024-04-25T12:52:31Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257044},"id":2478318778,"type":"categorize","namespace":14,"title":"Category:Milford, Derbyshire","title_url":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","comment":"[[:File:The King William pub - geograph.org.uk - 5560373.jpg]] added to category","timestamp":1714049551,"user":"WereSpielChequers","bot":false,"notify_url":"https://commons.wikimedia.org/w/index.php?diff=871189763&oldid=871189716&rcid=2478318778","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"<a href=\"/wiki/File:The_King_William_pub_-_geograph.org.uk_-_5560373.jpg\" title=\"File:The King William pub - geograph.org.uk - 5560373.jpg\">File:The King William pub - geograph.org.uk - 5560373.jpg</a> added to category"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","request_id":"c408c6e9-7e92-4f96-8a99-1d518f3af5ed","id":"e94ece30-3416-41e9-860d-d3547f2248d6","dt":"2024-04-25T12:52:33Z","domain":"ko.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257045},"type":"log","namespace":0,"title":"조상환","title_url":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","comment":"","timestamp":1714049553,"user":"Cho Sang Hwan","bot":false,"log_id":0,"log_type":"abusefilter","log_action":"hit","log_params":{"action":"edit","filter":"71","actions":"tag","log":1521905},"log_action_comment":"Cho Sang Hwan님이 [[조상환]]에서 \"edit\" 동작을 하여 [[특수:편집필터/71|필터 71]]이(가) 작동하였습니다. 조치: 태그 ([[특수:편집필터기록/1521905|자세한 사항]])","server_url":"https://ko.wikipedia.org","server_name":"ko.wikipedia.org","server_script_path":"/w","wiki":"kowiki","parsedcomment":""}
20:52:31.940 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","request_id":"d2dfebf5-df74-43d5-860e-be964ee93420","id":"27e197b2-cfb1-4c65-9717-09bb25438f08","dt":"2024-04-25T12:52:33Z","domain":"os.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257046},"id":1911685,"type":"new","namespace":14,"title":"Категори:Хуссар Голландийы чи амардис, уыдон","title_url":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","comment":"Ног фарс, йæ код райдайы афтæ: «[[Категори:Нидерландты чи амардис, уыдон]] [[Категори:Хуссар Голландийы зындгонд адæм|Амард]]»","timestamp":1714049553,"user":"Taamu","bot":false,"notify_url":"https://os.wikipedia.org/w/index.php?oldid=558822&rcid=1911685","minor":false,"patrolled":true,"length":{"new":167},"revision":{"new":558822},"server_url":"https://os.wikipedia.org","server_name":"os.wikipedia.org","server_script_path":"/w","wiki":"oswiki","parsedcomment":"Ног фарс, йæ код райдайы афтæ: «<a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%9D%D0%B8%D0%B4%D0%B5%D1%80%D0%BB%D0%B0%D0%BD%D0%B4%D1%82%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD\" title=\"Категори:Нидерландты чи амардис, уыдон\">Категори:Нидерландты чи амардис, уыдон</a> <a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D0%B7%D1%8B%D0%BD%D0%B4%D0%B3%D0%BE%D0%BD%D0%B4_%D0%B0%D0%B4%C3%A6%D0%BC\" title=\"Категори:Хуссар Голландийы зындгонд адæм\">Амард»</a>"}……

Consume Message

  • 启动OpenSearchConsumer
# 此步骤可选
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \--topic wikimedia.recentchange --from-beginning
GET /_cat/indices?vGET _search
{"query": {"match_all": {}}
}GET /index_name/_search
{"query": {"match_all": {}}
}

Outro

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11711.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI智能体|我把Kimi接入了个人微信

大家好&#xff0c;我是无界生长。 最近加入AI学习交流群的小伙伴越来越多&#xff0c;我打算在微信群接入一个聊天机器人&#xff0c;让它协助管理微信群&#xff0c;同时也帮忙给群友解答一些问题。普通的群聊机器人肯定是不能满足需求的&#xff0c;得上AI大模型&#xff0c…

【JVM类加载机制】深度剖析JVM类加载机制

深度剖析JVM类加载机制 前言类加载运行全过程loadClass的类加载过程 类加载器和双亲委派机制类加载器的类型类加载器的初始化过程双亲委派机制为什么要设置双亲委派机制&#xff1f;全盘负责委托机制自定义类加载器实例打破双亲委派机制Tomcat打破双亲委派机制Tomcat自定义加载…

问题解决记录 | kettle中出现中文乱码

spoon.bat的启动文件中进行修改 if "%PENTAHO_DI_JAVA_OPTIONS%""" set PENTAHO_DI_JAVA_OPTIONS"-Xms1024m" "-Xmx2048m" "-Dfile.encodingUTF-8"

spark结课之小小tip

scala常用方法总结&#xff1a; 1.map()方法&#xff1a;用于对集合中的每个元素应用一个函数&#xff0c;并将结果收集到一个新的集合中。 基本结构&#xff1a; def map[B](f: (A) > B): List[B] 实例&#xff1a; val numbers List(1, 2, 3, 4, 5) val doubledNumber…

废品回收小程序,推动回收行业数字化发展

在垃圾分类、资源回收利用的时代背景下&#xff0c;废品回收行业迅速成长&#xff0c;市场规模逐渐扩大&#xff01; 随着“互联网”应用的普及&#xff0c;废品回收行业也进入到了数字化回收领域&#xff0c;各大回收行业开始专注于发展智能回收。此外&#xff0c;线上废品回…

‘vue-cli-service‘ is not recognized as an internal or external command解决方案

vue-cli-service is not recognized as an internal or external command, operable program or batch file.解决方案 先进行 &#xff1a; npm install -g vue/cli 命令安装vue cli 是必须的。 如果 npm run build 还是报错 遇到同样的提示&#xff1a; 这时候先安装依赖 np…

智慧管家物业管理系统(小组项目)

目录 前言 一、项目介绍 1、目的和背景 2、项目主要内容 3、技术介绍 二、功能模块 1、重要文件结构 2、功能实现&#xff08;部分个人负责模块功能&#xff09; 2.1 展示房源信息页面 2.2 房屋详情页面 2.3 房源信息管理 三、功能模块页面 1、前台模块 2、后台…

【讲解下iCloud如何高效利用】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

【C语言/数据结构】栈:从概念到两种存储结构的实现

目录 一、栈的概念 二、栈的两种实现方式 1.顺序表实现栈 2.链表实现栈 三、栈的顺序存储结构及其实现 1.栈的声明 2.栈的初始化 3.栈的销毁 4.栈的压栈 5.栈的弹栈 6.栈的判空 7.返回栈顶元素 8.返回栈的长度 四、栈的链式存储结构及其实现 1.栈的声明 2.栈的…

推荐非常方便的初始配置nginx的开源工具

官网 https://www.digitalocean.com/community/tools/nginx?global.app.langzhCN直接复制base64字符串在 /etc/nginx 目录执行&#xff0c;会自动生成配置文件&#xff0c;最后执行 使用tar解压新的压缩配置 tar -xzvf nginxconfig.io-xxx.com.tar.gz | xargs chmod 0644在…

用Transformers实现简单的大模型文本生成

根据输入的prompt&#xff0c;生成一段指定长度的文字。Llama跑起来太慢了&#xff0c;这里用GPT-2作为列子。 from transformers import GPT2LMHeadModel, GPT2Tokenizer import torchtokenizer GPT2Tokenizer.from_pretrained("gpt2") model GPT2LMHeadModel.fr…

打造清洁宜居家园保护自然生态环境,基于YOLOv7【tiny/l/x】参数系列模型开发构建自然生态场景下违规违法垃圾倾倒检测识别系统

自然生态环境&#xff0c;作为我们人类赖以生存的家园&#xff0c;其健康与否直接关系到我们的生活质量。然而&#xff0c;近年来&#xff0c;一些不法分子为了个人私利&#xff0c;在河边、路边等公共区域肆意倾倒垃圾&#xff0c;严重破坏了环境的健康与平衡。这种行为不仅损…

计算机视觉的应用30-基于深度卷积神经网络CNN模型实现物体表面缺陷检测技术的项目

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用30-基于深度卷积神经网络CNN模型实现物体表面缺陷检测技术的项目主要包括&#xff1a;物体表面缺陷检测技术项目介绍&#xff0c;数据构造&#xff0c;模型介绍。 物体表面缺陷检测技术是工业自动化…

[附源码]剑灵三系可乐6.1_Win服务端_联网+单机搭建

本教程仅限学习使用&#xff0c;禁止商用&#xff0c;一切后果与本人无关&#xff0c;此声明具有法律效应&#xff01;&#xff01;&#xff01;&#xff01; 教程是本人亲自搭建成功的&#xff0c;绝对是完整可运行的&#xff0c;踩过的坑都给你们填上了。 如果你是小白也没…

YOLOv9-20240507周更说明|更新MobileNetv4等多种轻量化主干

专栏地址&#xff1a;目前售价售价69.9&#xff0c;改进点70 专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 本周已更新说明&#xff1a; ### ⭐⭐更新时间&#xff1a;2024/5/12⭐⭐ 1. YOLOv9…

SQL Server “provider: Named Pipes Provider, error: 40 -无法打开到SQL Server的连接“错误处理

目录 错误提醒解决办法 错误提醒 连接SQL Server时显示如下错误&#xff1a; 解决办法 &#xff08;1&#xff09;首先&#xff0c;打开SQL Server Configuration Manager配置管理器 (2) 停止SQL Server服务 右键点击后&#xff0c;选择【停止】 (3) 启动TCP/IP &…

Co-Driver:基于 VLM 的自动驾驶助手,具有类人行为并能理解复杂的道路场景

24年5月来自俄罗斯莫斯科研究机构的论文“Co-driver: VLM-based Autonomous Driving Assistant with Human-like Behavior and Understanding for Complex Road Scenes”。 关于基于大语言模型的自动驾驶解决方案的最新研究&#xff0c;显示了规划和控制领域的前景。 然而&…

Bittensor怎么挖?手把手教你,使用bitget钱包

4月 Binance 上新 TheBittensorHub (TAO), 这个项目究竟做了什么可以令其在上大舞台前就已经在所有通证中排名前 30&#xff1f; 本文将深度解析。 该项目既不直接贡献数据&#xff0c;也不直接贡献算力。 而是通过区块链网络和激励机制&#xff0c;来对不同的算法进行调度和…

【HarmonyOS】综合应用-《校园通》

概念 本文结合之前的笔记文章知识点&#xff0c;做一个综合性的小应用。 创建一个ArkTS语言的鸿蒙项目&#xff0c;搭建首页面 其界面代码如下&#xff0c;该界面使用了垂直布局&#xff0c;相对布局&#xff0c;轮播布局&#xff0c;以及图片&#xff0c;文本等组件的综合运…

具身智能论文(一)

目录 1. PoSE: Suppressing Perceptual Noise in Embodied Agents for Enhanced Semantic Navigation2. Embodied Intelligence: Bionic Robot Controller Integrating Environment Perception, Autonomous Planning, and Motion Control3. Can an Embodied Agent Find Your “…