tt

    技术2022-07-10  82

    from config import path_dir import os import time import json from google.cloud import bigtable # from google.cloud import happybase import pandas as pd from google.cloud.bigtable.row_filters import RowFilterChain from google.cloud.bigtable.row_set import RowSet os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_dir() + "/train_data/p.json" client = bigtable.Client(project="heidao-market", admin=True) instance = client.instance("yotta-bigtable") # connection = happybase.Connection(instance=instance) # connection = bigtable.table.Table(instance # , app_profile_id='copy-in-tw' # ) def get_data_by_list(rows): if len(rows) <= 0: return pd.DataFrame() data = pd.DataFrame(rows) for col in data.columns: # print(data.columns) # print(data) # new_name = col.decode().split(':')[1] # print(col) if col == b'game:value_1' or col == b'game:position': data = data[~data[col].isnull()] data[col] = data[col].apply(lambda x: x.decode()) else: data[col] = data[col].apply(lambda x: int.from_bytes(x, byteorder='big', signed=True)) data = data.rename(columns=lambda x: x.decode().split(':')[1]) data['timestamp'] = pd.to_datetime(data['timestamp'], unit='us') return data def get_bigtable_data(player_id, time_cut, path, columns, hour=14): table = bigtable.table.Table(path, instance, app_profile_id='copy-in-tw') # connection.table(path) re_player_id = str(player_id)[::-1] # end_time = round(time.time()//3600)*3600*1000 + 1 # end_time = int(time.time())*1000 end_time = time_cut start_time = end_time - int(hour * 3600 * 1000) start = '{}#{}'.format(re_player_id, start_time).encode() stop = '{}#{}'.format(re_player_id, end_time).encode() rows = [] for key, row in scan(table, {'start_row': start, 'end_row': stop, 'columns': columns}): rows.append(row) data = get_data_by_list(rows) return data def get_exposure_data(player_id, time_cut, project='mafia1_ods', hour=14): data = get_bigtable_data(player_id, time_cut, path="{}.sdk_gift_bag_tracking".format(project), columns=[b'game:player_id', b'game:timestamp', b'game:value_1', b'game:event_id', b'game:action', b'game:type', b'game:position', b'game:event_id'], hour=hour) return data def scan(self, table, params): if not table: return if 'row_prefix' in params and (('start_row' in params and params['start_row']) or ('end_row' in params and params['end_row'])): # logger.error("Row prefix cannot be combined with start row or end_row.") return filters = [] start_row = None end_row = None filter_chain = None row_set = None if 'start_row' in params: start_row = params['start_row'] if 'end_row' in params: end_row = params['end_row'] if 'row_prefix' in params: row_prefix = params['row_prefix'] start_row = row_prefix end_row = self._get_next_string(row_prefix) if 'columns' in params and params['columns']: filters.append(self._get_columns_filter(params['columns'])) filters_num = len(filters) if filters_num == 1: filter_chain = filters[0] elif filters_num > 1: filter_chain = RowFilterChain(filters=filters) if start_row and end_row: row_set = RowSet() row_set.add_row_range_from_keys(start_key=start_row, end_key=end_row) scan_list = {} # table = self._get_table(table_name) rows = table.read_rows(row_set=row_set, filter_=filter_chain) for rowdata in rows: curr_row_data = rowdata curr_row_dict = self._row_to_dict(curr_row_data) scan_list[curr_row_data.row_key] = curr_row_dict family_name = None if 'family_name' in params: family_name = params['family_name'] replace_prefix = False if 'replace_prefix' in params and params['replace_prefix'] is True: replace_prefix = True info = {} for key, row in scan_list.items(): key = str(key, encoding='utf-8') if replace_prefix is True and row_prefix: key = key.replace(row_prefix, '') row_dict = {} for k, v in row.items(): k = str(k, encoding='utf-8') if family_name: k = k.replace(family_name + ':', '') v = str(v, encoding='utf-8') row_dict[k] = v info[key] = row_dict return info
    Processed: 0.037, SQL: 9