from config import path_dir
import os
import time
import json
from google.cloud import bigtable
# from google.cloud import happybase
import pandas as pd
from google.cloud.bigtable.row_filters import RowFilterChain
from google.cloud.bigtable.row_set import RowSet
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_dir() + "/train_data/p.json"
client = bigtable.Client(project="heidao-market", admin=True)
instance = client.instance("yotta-bigtable")
# connection = happybase.Connection(instance=instance)
# connection = bigtable.table.Table(instance
# , app_profile_id='copy-in-tw'
# )
def get_data_by_list(rows):
if len(rows) <= 0:
return pd.DataFrame()
data = pd.DataFrame(rows)
for col in data.columns:
# print(data.columns)
# print(data)
# new_name = col.decode().split(':')[1]
# print(col)
if col == b'game:value_1' or col == b'game:position':
data = data[~data[col].isnull()]
data[col] = data[col].apply(lambda x: x.decode())
else:
data[col] = data[col].apply(lambda x: int.from_bytes(x, byteorder='big', signed=True))
data = data.rename(columns=lambda x: x.decode().split(':')[1])
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='us')
return data
def get_bigtable_data(player_id, time_cut, path, columns, hour=14):
table = bigtable.table.Table(path, instance, app_profile_id='copy-in-tw')
# connection.table(path)
re_player_id = str(player_id)[::-1]
# end_time = round(time.time()//3600)*3600*1000 + 1
# end_time = int(time.time())*1000
end_time = time_cut
start_time = end_time - int(hour * 3600 * 1000)
start = '{}#{}'.format(re_player_id, start_time).encode()
stop = '{}#{}'.format(re_player_id, end_time).encode()
rows = []
for key, row in scan(table, {'start_row': start,
'end_row': stop,
'columns': columns}):
rows.append(row)
data = get_data_by_list(rows)
return data
def get_exposure_data(player_id, time_cut, project='mafia1_ods', hour=14):
data = get_bigtable_data(player_id, time_cut, path="{}.sdk_gift_bag_tracking".format(project),
columns=[b'game:player_id', b'game:timestamp', b'game:value_1', b'game:event_id',
b'game:action',
b'game:type', b'game:position', b'game:event_id'],
hour=hour)
return data
def scan(self, table, params):
if not table:
return
if 'row_prefix' in params and (('start_row' in params and
params['start_row']) or ('end_row' in params and params['end_row'])):
# logger.error("Row prefix cannot be combined with start row or end_row.")
return
filters = []
start_row = None
end_row = None
filter_chain = None
row_set = None
if 'start_row' in params:
start_row = params['start_row']
if 'end_row' in params:
end_row = params['end_row']
if 'row_prefix' in params:
row_prefix = params['row_prefix']
start_row = row_prefix
end_row = self._get_next_string(row_prefix)
if 'columns' in params and params['columns']:
filters.append(self._get_columns_filter(params['columns']))
filters_num = len(filters)
if filters_num == 1:
filter_chain = filters[0]
elif filters_num > 1:
filter_chain = RowFilterChain(filters=filters)
if start_row and end_row:
row_set = RowSet()
row_set.add_row_range_from_keys(start_key=start_row, end_key=end_row)
scan_list = {}
# table = self._get_table(table_name)
rows = table.read_rows(row_set=row_set, filter_=filter_chain)
for rowdata in rows:
curr_row_data = rowdata
curr_row_dict = self._row_to_dict(curr_row_data)
scan_list[curr_row_data.row_key] = curr_row_dict
family_name = None
if 'family_name' in params:
family_name = params['family_name']
replace_prefix = False
if 'replace_prefix' in params and params['replace_prefix'] is True:
replace_prefix = True
info = {}
for key, row in scan_list.items():
key = str(key, encoding='utf-8')
if replace_prefix is True and row_prefix:
key = key.replace(row_prefix, '')
row_dict = {}
for k, v in row.items():
k = str(k, encoding='utf-8')
if family_name:
k = k.replace(family_name + ':', '')
v = str(v, encoding='utf-8')
row_dict[k] = v
info[key] = row_dict
return info
转载请注明原文地址:https://ipadbbs.8miu.com/read-1405.html