BigQuery Sharded Tables
Using Array
from google.cloud import bigquery
location = "asia-northeast3"
my_account = "# rho715"
def runQuery_DF(query):
final_query = "\n".join([my_account, query]) #add my account info to query input
client = bigquery.Client(location=location)
data = client.query(final_query).to_dataframe() #run query
client.close()
print('\nquery : ' , final_query)
return data
query = """
with data_struct as (
SELECT
[struct('{project_id}.{dataset}.{table_name}' as table_address, '' as table_type )
, struct('{project_id}.{dataset}.{table_name}_20220101' as table_address, 'sharded' as table_type )
] as sample_table
)
SELECT
SPLIT(sample_table_elem.table_address, ".")[OFFSET(0)] AS project,
SPLIT(sample_table_elem.table_address, ".")[OFFSET(1)] AS dataset,
SPLIT(sample_table_elem.table_address, ".")[OFFSET(2)] AS table,
sample_table_elem.table_address,
REGEXP_REPLACE(table_address, r"(20[0-9]{6})", "{yyyymmdd}") param, -- change 20220101 -> {yyyymmdd} : for parameter_use
regexp_extract((table_address), r"(20[0-9]{6})") yyyymmdd,
sample_table_elem.table_type,
FROM data_struct, UNNEST(sample_table) as sample_table_elem
"""
runQuery_DF(query)
project | dataset | table | table_address | param | yyyymmdd | table_type | |
---|---|---|---|---|---|---|---|
0 | {project_id} | {dataset} | {table_name} | {project_id}.{dataset}.{table_name} | {project_id}.{dataset}.{table_name} | None | |
1 | {project_id} | {dataset} | {table_name}_20220101 | {project_id}.{dataset}.{table_name}_20220101 | {project_id}.{dataset}.{table_name}_{yyyymmdd} | 20220101 | sharded |
Using INFORMATION_SCHEMA.TABLES
from google.cloud import storage, bigquery
location = "asia-northeast3"
my_account = "# rho715"
def runQuery_DF(query):
final_query = "\n".join([my_account, query]) #add my account info to query input
client = bigquery.Client(location=location)
data = client.query(final_query).to_dataframe() #run query
client.close()
print('\nquery : ' , final_query)
return data
query = """
-- DECLARE d_yyyymmdd STRING DEFAULT '20230101';
DECLARE d_yyyymmdd STRING DEFAULT FORMAT_DATE("%Y%m%d", CURRENT_DATE("Asia/Seoul"));
with sharded_table_filter as (
select REGEXP_REPLACE(table_name, r"(20[0-9]{6})", "{yyyymmdd}") table_name
, REGEXP_REPLACE(max(table_name), r"(20[0-9]{6})", d_yyyymmdd ) max_table_name --max(table_name) max_table_name
, regexp_extract(max(table_name), r"(20[0-9]{6})") yyyymmdd
from {input_dataset}.INFORMATION_SCHEMA.TABLES
group by table_name
) -- 샤딩테이블은 가장 최신 테이블을 불러온다음 날짜를 "{yyyymmdd}"로 바꿈
select
A.table_schema,
stf.table_name,
A.column_name,
A.DATA_TYPE,
C.description,
B.table_type,
is_partitioning_column,
'' as sample_data,
ordinal_position,
is_nullable,
clustering_ordinal_position,
stf.max_table_name
from {input_dataset}.INFORMATION_SCHEMA.COLUMNS A
inner join sharded_table_filter stf on A.table_name = stf.max_table_name
left join {input_dataset}.INFORMATION_SCHEMA.TABLES B on A.table_name = B.table_name
left join {input_dataset}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS C on concat(A.table_name, A.column_name) = concat(C.table_name,C.column_name)
order by 1,2;
"""
runQuery_DF(query)
table_schema | table_name | column_name | DATA_TYPE | description | table_type | is_partitioning_column | sample_data | ordinal_position | is_nullable | clustering_ordinal_position | max_table_name | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | {input_dataset} | log_{yyyymmdd} | uno | STRING | user number | BASE TABLE | NO | 1 | YES | <NA> | log_20230127 | |
1 | {input_dataset} | log_{yyyymmdd} | profile_id | STRING | user profile id | BASE TABLE | NO | 2 | YES | <NA> | log_20230127 | |
2 | {input_dataset} | log_{yyyymmdd} | device_type | STRING | 클라이언트 단말 구분 | BASE TABLE | NO | 3 | YES | <NA> | log_20230127 | |
3 | {input_dataset} | log_{yyyymmdd} | program_id | STRING | program id | BASE TABLE | NO | 4 | YES | <NA> | log_20230127 | |
4 | {input_dataset} | log_{yyyymmdd} | content_id | STRING | content id | BASE TABLE | NO | 5 | YES | <NA> | log_20230127 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
71 | {input_dataset} | event_{yyyymmdd} | guid | STRING | 클라이언트 단말 고유ID | BASE TABLE | NO | 3 | YES | <NA> | event_20230127 | |
72 | {input_dataset} | event_{yyyymmdd} | params | STRING | http params | BASE TABLE | NO | 4 | YES | <NA> | event_20230127 | |
73 | {input_dataset} | event_{yyyymmdd} | bodies | STRING | JSON 형식의 uievent 로그 데이터 | BASE TABLE | NO | 5 | YES | <NA> | event_20230127 | |
74 | {input_dataset} | event_{yyyymmdd} | ap_date | STRING | 로그 서버의 로그 수신 시간(KST) | BASE TABLE | NO | 6 | YES | <NA> | event_20230127 | |
75 | {input_dataset} | event_{yyyymmdd} | ap_timestamp | TIMESTAMP | ap_date의 timestamp 값(KST) | BASE TABLE | YES | 7 | YES | <NA> | event_20230127 |
76 rows × 12 columns
Using ddl
from google.cloud import storage, bigquery
location = "asia-northeast3"
my_account = "# rho715"
def runQuery_DF(query):
final_query = "\n".join([my_account, query]) #add my account info to query input
client = bigquery.Client(location=location)
data = client.query(final_query).to_dataframe() #run query
client.close()
print('\nquery : ' , final_query)
return data
query = """
select table_catalog
, table_schema
, table_name
, current_date as yyyy_mm_dd
, ddl
, replace(ddl, 'CREATE TABLE', 'CREATE TABLE IF NOT EXISTS') ddl_create_if_not_exists
from {input_project_id}.{input_dataset}.INFORMATION_SCHEMA.TABLES t2
order by 1 ,3
limit 5
"""
runQuery_DF(query)
table_catalog | table_schema | table_name | yyyy_mm_dd | ddl | ddl_create_if_not_exists | |
---|---|---|---|---|---|---|
0 | {input_project_id} | {input_dataset} | log_20230125 | 2023-01-27 | CREATE TABLE `{input_project_id}.{input_dataset}.log_2023... | CREATE TABLE IF NOT EXISTS `{input_project_id}.{input_data... |
1 | {input_project_id} | {input_dataset} | log_20230126 | 2023-01-27 | CREATE TABLE `{input_project_id}.{input_dataset}.log_2023... | CREATE TABLE IF NOT EXISTS `{input_project_id}.{input_data... |
2 | {input_project_id} | {input_dataset} | log_20230127 | 2023-01-27 | CREATE TABLE `{input_project_id}.{input_dataset}.log_2023... | CREATE TABLE IF NOT EXISTS `{input_project_id}.{input_data... |
3 | {input_project_id} | {input_dataset} | appsflyer_20220617 | 2023-01-27 | CREATE TABLE `{input_project_id}.{input_dataset}.appsflyer_2... | CREATE TABLE IF NOT EXISTS `{input_project_id}.{input_data... |
4 | {input_project_id} | {input_dataset} | appsflyer_20220618 | 2023-01-27 | CREATE TABLE `{input_project_id}.{input_dataset}.appsflyer_2... | CREATE TABLE IF NOT EXISTS `{input_project_id}.{input_data... |
댓글남기기