Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #import snowflake connector module
- import snowflake.connector
- import sys
- import os
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.asymmetric import rsa
- from cryptography.hazmat.primitives.asymmetric import dsa
- from cryptography.hazmat.primitives import serialization
- # 確定輸入的參數是否給定正確
- # Sample: python select_data.py <account> <user> <role>
- if len(sys.argv) < 4 :
- print("ERROR: Please pass the following command-line parameters in order:",end='\n')
- print("account,user,role.")
- sys.exit(-1)
- else:
- ACCOUNT = sys.argv[1]
- USER = sys.argv[2]
- ROLE = sys.argv[3]
- with open("/Users/abehsu/Documents/Snowflake/Snowpipe_poc/rsa_key.p8", "rb") as key:
- p_key = serialization.load_pem_private_key(
- key.read(),
- password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
- backend=default_backend()
- )
- pkb = p_key.private_bytes(
- encoding=serialization.Encoding.DER,
- format=serialization.PrivateFormat.PKCS8,
- encryption_algorithm=serialization.NoEncryption()
- )
- con = snowflake.connector.connect(
- account=ACCOUNT,
- user=USER,
- role=ROLE,
- private_key=pkb
- )
- con.cursor().execute("""
- USE WAREHOUSE tiny_warehouse_mg;
- """)
- con.cursor().execute("""
- USE SCHEMA testdb_mg.testschema_mg;
- """)
- # # 傳統方式
- # cur = con.cursor()
- # try:
- # cur.execute("""
- # SELECT COL1,COL2 FROM test_table ORDER BY COL1
- # """)
- # for (col1,col2) in cur:
- # print('%s,%s' %(col1,col2))
- # finally:
- # cur.close()
- # # 比較便利的語法
- # for (col1,col2) in con.cursor().execute("SELECT COL1,COL2 FROM test_table ORDER BY COL1"):
- # print('%s,%s' %(col1,col2))
- # # 如果只想得到一筆結果的話,可以使用fetchone()
- # col1, col2 = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1").fetchone()
- # print('%s,%s' %(col1,col2))
- # # 如果想要一次取得特定的筆數,可以使用fetchmany()
- # cur = con.cursor().execute("SELECT COL1, COL2 FROM TEST_TABLE ORDER BY COL1")
- # ret = cur.fetchmany(3)
- # print(ret)
- # while len(ret) > 0 :
- # ret = cur.fetchmany(3)
- # print(ret)
- # 如果想要一次拿到所有結果
- results = con.cursor().execute("SELECT col1, col2 FROM test_table").fetchall()
- for rec in results:
- print('%s, %s' % (rec[0], rec[1]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement