温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

go语言怎么实现Elasticsearches批量修改查询及发送MQ

发布时间:2022-04-19 15:19:08 来源:亿速云 阅读:249 作者:iii 栏目:开发技术

这篇“go语言怎么实现Elasticsearches批量修改查询及发送MQ”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go语言怎么实现Elasticsearches批量修改查询及发送MQ”文章吧。

update_by_query批量修改

POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query {   "query": {     "bool": {       "must": [         {           "term": {             "join_field": {               "value": "post"             }           }         },         {           "term": {             "platform": {               "value": "toutiao"             }           }         },         {           "exists": {             "field": "liked_count"           }         }       ]     }   },   "script":{     "source":"ctx._source.liked_count=0",     "lang":"painless"   } }

索引添加字段

PUT user_tiktok/_doc/_mapping?include_type_name=true {   "post_signature":{     "StuClass":{       "type":"keyword"     },     "post_token":{       "type":"keyword"     }   } } PUT user_toutiao/_mapping {   "properties": {     "user_token": {       "type": "text"     }   } }

查询es发送MQ

from celery import Celery from elasticsearch import Elasticsearch import logging import arrow import pytz from elasticsearch.helpers import scan, streaming_bulk import redis pool_16_8 = redis.ConnectionPool(host='10.0.3.100', port=6379, db=8, password='EfcHGSzKqg6cfzWq') rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8) logger = logging.getLogger('elasticsearch') logger.disabled = False logger.setLevel(logging.INFO) es_zoo_connection = Elasticsearch('http://eswriter:e s密码@e sip:4000', dead_timeout=10,                                   retry_on_timeout=True) logger = logging.getLogger(__name__) class ES(object):     index = None     doc_type = None     id_field = '_id'     version = ''     source_id_field = ''     aliase_field = ''     separator = '-'     aliase_func = None     es = None     tz = pytz.timezone('Asia/Shanghai')     logger = logger     @classmethod     def mget(cls, ids=None, index=None, **kwargs):         index = index or cls.index         docs = cls.es.mget(body={'ids': ids}, doc_type=cls.doc_type, index=index, **kwargs)         return docs     @classmethod     def count(cls, query=None, index=None, **kwargs):         index = index or cls.index         c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs)         return c.get('count', 0)     @classmethod     def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs):         body = {             "doc": doc,         }         if doc_as_upsert:             body['doc_as_upsert'] = True         id = doc_id or cls.id_name(doc)         index = index or cls.index_name(doc)         cls.es.update(index, id, cls.doc_type, body, **kwargs)     @classmethod     def search(cls, index=None, query=None, **kwargs):         index = index or cls.index         return cls.es.search(index=index, body=query, **kwargs)     @classmethod     def scan(cls, query, index=None, **kwargs):         return scan(cls.es,                     query=query,                     index=index or cls.index,                     **kwargs)     @classmethod     def index_name(cls, doc):         if cls.aliase_field and cls.aliase_field in doc.keys():             aliase_part = doc[cls.aliase_field]             if isinstance(aliase_part, str):                 aliase_part = arrow.get(aliase_part)             if isinstance(aliase_part, int):                 aliase_part = arrow.get(aliase_part).astimezone(cls.tz)             if cls.version:                 index = '{}{}{}{}{}'.format(cls.index, cls.separator, cls.version, cls.separator,                                             cls.aliase_func(aliase_part))             else:                 index = '{}{}{}'.format(cls.index, cls.separator, cls.aliase_func(aliase_part))         else:             index = cls.index         return index     @classmethod     def id_name(cls, doc):         id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field)         if not id:             print('========', doc)         assert id, 'doc _id must not be None'         return id     @classmethod     def bulk_upsert(cls, docs, **kwargs):         """         批量操作文章, 仅支持 index 和 update         """         op_type = kwargs.get('op_type') or 'update'         chunk_size = kwargs.get('chunk_size')         if op_type == 'update':             upsert = kwargs.get('upsert', True)             if upsert is None:                 upsert = True         else:             upsert = False         actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert)         result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False,                                 max_retries=5, request_timeout=25)         return result     @classmethod     def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs):         assert not upsert or (upsert and op_type == 'update'), 'upsert should use "update" as op_type'         for doc in docs:             # 支持 index_name 作为一个工厂函数             if callable(index_name):                 index = index_name(doc)             else:                 index = index_name             if op_type == 'index':                 _source = doc             elif op_type == 'update' and not upsert:                 _source = {'doc': doc}             elif op_type == 'update' and upsert:                 _source = {'doc': doc, 'doc_as_upsert': True}             else:                 continue             if callable(id_name):                 id = id_name(doc)             else:                 id = id_name             # 生成 Bulk 动作             action = {                 "_op_type": op_type,                 "_index": index,                 "_type": doc_type,                 "_id": id,                 "_source": _source             }             yield action class tiktokEsUser(ES):     index = 'user_tiktok'     doc_type = '_doc'     id_field = '_id'     source_id_field = 'user_id'     es = es_zoo_connection from kombu import Exchange, Queue, binding def data_es_route_task_spider(name, args, kwargs, options, task=None, **kw):     return {         'exchange': 'tiktok',         'exchange_type': 'topic',         'routing_key': name     } class DataEsConfig_download(object):     broker_url = 'amqp://用户:密码@ip:端口/'     task_ignore_result = True     task_serializer = 'json'     accept_content = ['json']     task_default_queue = 'default'     task_default_exchange = 'default'     task_default_routing_key = 'default'     exchange = Exchange('tiktok', type='topic')     task_queues = [         Queue(             'tiktok.user_avatar.download',             [binding(exchange, routing_key='tiktok.user_avatar.download')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.post_avatar.download',             [binding(exchange, routing_key='tiktok.post_avatar.download')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.post.spider',             [binding(exchange, routing_key='tiktok.post.spider')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.post.save',             [binding(exchange, routing_key='tiktok.post.save')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.user.save',             [binding(exchange, routing_key='tiktok.user.save')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.post_avatar.invalid',             [binding(exchange, routing_key='tiktok.post_avatar.invalid')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.user_avatar.invalid',             [binding(exchange, routing_key='tiktok.user_avatar.invalid')],             queue_arguments={'x-queue-mode': 'lazy'}         ),         Queue(             'tiktok.comment.save',             [binding(exchange, routing_key='tiktok.comment.save')],             queue_arguments={'x-queue-mode': 'lazy'}         ),     ]     task_routes = (data_es_route_task_spider,)     enable_utc = True     timezone = "Asia/Shanghai" # 下载app tiktok_app = Celery(     'tiktok',     include=[         'task.tasks',     ] ) tiktok_app.config_from_object(DataEsConfig_download) # 发任务生产者,更新舆情user历史信息 def send_post():     query = {         "query": {             "bool": {                 "must": [                     {                         "exists": {                             "field": "post_signature"                         }                     },                     {                         "range": {                             "following_num": {                                 "gte": 1000                             }                         }                     }                 ]             }         },         "_source": ["region", "sec_uid", "post_signature"]     }     # query = {     #     "query": {     #         "bool": {     #             "must": [     #                 {"exists": {     #                     "field": "post_signature"     #                 }},     #                 {     #                     "match": {     #                         "region": "MY"     #                     }     #                 }     #             ]     #         }     #     },     #     "_source": ["region", "sec_uid", "post_signature"]     # }     r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100)     for item in map(lambda x: x['_source'], r):         tiktok_app.send_task('tiktok.post.spider', args=(item,)) def send_sign_token():     query = {         "query": {             "bool": {                 "must": [                     {                         "exists": {                             "field": "post_signature"                         }                     },                     {                         "range": {                             "following_num": {                                 "gte": 1000                             }                         }                     },                     {                         "range": {                             "create_time": {                                 "gte": "2021-01-06T00:00:00",                                 "lte": "2021-01-06T01:00:00"                             }                         }                     }                 ]             }         },         "_source": ["user_id", "sec_uid"]     }     r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100)     for item in map(lambda x: x['_source'], r):         tiktok_app.send_task('tiktok.user.sign_token', args=(item,)) if __name__ == '__main__':     send_post()     # send_sign_token()

以上就是关于“go语言怎么实现Elasticsearches批量修改查询及发送MQ”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI