Test, Check data with Bigquery and Email notification from Application integration

มาสร้างแจ้งเตือนอีเมลเวลาข้อมูลผิดปรกติหรือตรวจข้อมูล โดยแค่เขียน SQL เป็นก็พอแล้วกัน ผ่าน Application integration และ Cloud logging

ปัจจุบันเวลาเราเก็บข้อมูลไว้ที่ Bigquery บางทีเราก็ไม่อาจทราบได้ว่าข้อมูลของเราเวลาที่ผ่านการทำ ETL หรือ ELT แล้วมันมีข้อมูลที่ผิดผลาดหรือคลาดเคลื่อนหรือเปล่าอย่างเช่น อยู่ดีๆก็มีข้อมูลซ้ำเกิดขึ้นในตารางของเรา

เพื่อความสะบายใจและสร้างความเชื่อมั่นให้กับข้อมูลแล้ว ปรกติเราควรมีการเช็คข้อมูลอยู่เสมอๆซึ่งเป็นหนึ่งในรูปแบบการทำ Data quality ซึ่งสามารถทำได้หลายวิธีและก็มีหลาย Open source ให้ใช้ อย่างเช่น Great expectation หรือจะเป็น service Data quality ของ GCP เอง (จะมีเขียนถึงในอนาคต)


แต่ในบทความนี้จะพาทำการแจ้งเตือนในรูปแบบง่ายๆและประหยัด รวมไปถึง learning curve ไม่สูงเท่ากับการใช้ Open source ที่ขอแค่เขียน SQL เป็นก็เพียงพอแล้ว โดยสิ่งที่จะได้เขียนถึงคือ

  1. วิธีแจ้งเตือนผ่าน Application integration
  2. วิธีแจ้งเตือนผ่าน Cloud logging
Flow ตัวอย่าง

สามารถเลือกอย่างใดอย่างหนึ่งไปปรับใช้ได้

Introduction

โดยเริ่มต้นนั้นจะคล้ายๆกันคือเรามีตารางอยู่ตารางหนึ่งอยากจะทำการแจ้งเตือนโดยเป็นการส่งอีเมลเมื่อมีความผิดปรกติ โดยในตัวอย่างที่จะมีในบทความคือ การที่เราอยากจะเช็ค ว่าข้อมูลที่ gender นั้นไม่ควรมีความยาวของ character มากกว่า 1 ตัวอักษร เช่น M = เพศชาย F = เพศหญิง N = ไม่ระบุ ถ้าหากเช็คแล้วมีมากกว่า 1 ตัวอักษรจะต้องส่งอีเมลทันที จะได้เข้าไปเช็คข้อมูลอย่างทันท่วงที

Schedule and Assert

โดยใน Bigquery เราสามารถใช้วิธี Assert ได้เหมือนกับการทำ unit test ใน framework อื่นๆ เพื่อที่จะได้ทำการเช็คว่าข้อมูลถูกต้องไหม โดยถ้าไม่ถูกต้องก็จะเกิด Error ทันที เพราะถ้าไม่ผิดปรกติ สิ่งที่ได้ก็แค่ Output ของ query เท่านั้น จะไม่มี Error เกิดขึ้น

โดยการใช้ Assert นั้นตามนี้

ตัวอย่างจาก mock data บน Bigquery
-- Example of how to use Assert
ASSERT (
SELECT
COUNT(gender) > 0
FROM
`your-project.your-dataset.customer_demo`
WHERE
LENGTH(gender) > 0
) AS "gender must not exceed 1 character" -- Assert label
ถ้าข้อมูลเราถูกต้อง Assert จะไม่เกิด Error
กรณีที่ข้อมูลไม่ตรงตามที่เราเขียน Assert จะเห็นว่าจะเกิดเป็น Error

ซึ่งเราสามารถใช้ Assert ได้หลากหลายรูปแบบ ในการทำการทดสอบข้อมูล

ต่อมาเราต้องทำการตั้ง Schedule เพราะเราคงไม่มานั้งกดเองทุกๆ ชั่วโมงหรือวัน ถูกต้องไหมครับ ในหน้า Bigquery ที่เราเขียน SQL นั้นจะมี Schedule อยู่ด้านบน ให้ทำการตั้งค่าเวลาตามต้องการ

!! ก่อนหน้านั้นเราต้องสร้าง Pub/Sub ให้เรียบร้อยก่อนเพื่อที่จะเป็นตัวส่งข้อมูลให้กับ Application integration ในการส่งอีเมล

สร้าง Cloud Pubsub
schedule your query on Bigquery

โดยเราสามารถตั้งค่าเวลาที่จะทำงานอัตโนมัติได้ที่ Scheduule options และ หากต้องการเก็บผลจากการ Query ให้คลิ๊กที่ Destination for query results แต่ของเรานั้นเนื่องจากไม่มี Output เลยไม่จำเป็น สุดท้ายที่สำคัญคือให้เลือก Notification options ให้เป็น Cloud Pub/Sub ที่เราเพิ่งสร้างไป

เพราะถ้าเราเลือกส่ง อีเมลในนี้ อีเมลจะถูกส่งไปหาคนที่สร้าง Schedule เท่านั้น

ตัวอย่างการตั้งค่า Schedule query
Schedule query ก็จะมาอยู่ที่ Data transfer

ต่อจากนี้จะเป็นการสร้าง Notification ในรูปแบบต่างๆตามที่ได้เขียนไว้ด้านบน

1. วิธีแจ้งเตือนผ่าน Application integration

หลังจากที่ได้สร้าง Schedule query เสร็จตามที่ต้องการแล้วให้ไปที่ Application Integration สามารถค้นหาได้ในหน้า Search ในตัวบริการนี้สามารถสร้าง Flow ต่างๆได้ไม่ว่าจะเป็นการรับ Input จาก Pubsub, scheduler, Jira หรือ Saleforce

ตัวอย่างการ integrate ของ Application integrate
Create Applicatio integration

พอเข้ามาที่ Application integrate แล้ว ในหน้าจะเจอแผนผังว่างเปล่า คล้ายๆเวลาที่เราใช้งานพวก Draw.io Miro อะไรพวกนี้ เราเรียกสิ่งนี้ว่า Canvas โดยขั้นตอนที่จะทำใน Application integrate นัั้นจะมีดังนี้

  1. รับข้อมูลจาก Pub/Sub triggers
  2. แปลงข้อมูลจาก Json ที่ได้มาจาก Pub/Sub ให้อยู่ในรูปแบบ String เพื่อจะได้อ่านได้ง่าย
  3. ทำการสร้าง Condition Flow ว่าให้ส่งอีเมลเฉพาะเวลาที่ Error เท่านั้น
  4. ส่งอีเมลเพื่อส่งแจ้งเตือน

รับข้อมูลจาก Pub/Sub triggers

บน Application integration ให้เรากดไปที่ Triggers แล้วเลือกที่ Pub/Sub

เลือก Pub/Sub ในการรับข้อมูล
ใส่ข้อมูลที่จำเป็นอย่าง Pub/Sub topic และ Service account

แปลงข้อมูลจาก Json ที่ได้มาจาก Pub/Sub ให้อยู่ในรูปแบบ String เพื่อจะได้อ่านได้ง่าย

ก่อนจะทำการดัดแปลงปรับปรุงข้อมูลเราต้องไปดูข้อมูลที่มาจาก Pub/Sub ก่อนว่าเป็นมาหน้าตายังไง

{
"data": "{\n \"dataSourceId\": \"scheduled_query\"
,\n \"destinationDatasetId\": \"\"
,\n \"emailPreferences\": {\n \n }
,\n \"endTime\": \"2024-10-14T06:50:20.297228Z\"
,\n \"errorStatus\": {\n \"code\": 3
,\n \"message\": \"gender must not exceed 1 character; JobID: 830755692475:scheduled_query_670b4266-0000-230b-a42e-ac3eb15212c8\"\n }
,\n \"name\": \"projects/8338554546/locations/us-central1/transferConfigs/670b7cec-0000-229b-92f3-14c14ef363c0/runs/670b4266-0000-230b-a42e-ac3eb15212c8\"
,\n \"notificationPubsubTopic\": \"projects/your-project/topics/alert-message\"
,\n \"params\": {\n \"query\": \"ASSERT (\\r\\n SELECT\\r\\n COUNT(gender) >0\\r\\n FROM\\r\\n `your-project.your-dataset.customer_demo`\\r\\n WHERE\\r\\n LENGTH(gender) > 0\\r\\n ) AS \\\"gender must not exceed 1 character\\\"\"\n }
,\n \"runTime\": \"2024-10-14T06:49:04.749Z\"
,\n \"scheduleTime\": \"2024-10-14T06:49:05.018337Z\"
,\n \"startTime\": \"2024-10-14T06:49:05.248137Z\"
,\n \"state\": \"FAILED\"
,\n \"updateTime\": \"2024-10-14T06:50:20.29727Z\"
,\n \"userId\": \"-6694879554424758352\"\n}\n"
,
"attributes": {
"eventType": "TRANSFER_RUN_FINISHED",
"payloadFormat": "JSON_API_V1"
},
"messageId": "12631583862038181",
"publishTime": "2024-10-14T06:50:20.358Z"
}

ซึ่งข้อมูลที่เราต้องการนั้นคือ data.errorStatus เพื่อที่จะเอาไปส่งอีเมลแจ้งเตือนและในส่วนนี้ถ้าข้อมูลของเราไม่มี Error จะเกิดเป็น “{ }”

พอเรารู้แล้วว่าหน้าตาข้อมูลเป็นยังไงก็มาต่อกันที่การทำ Data Transform โดยไปที่ Tasks แล้วเลือก Data Transformer

Data Transformer, a tool to transform your data

หลังจากนั้นก็คลิ๊ก Open data transformer editor เพื่อจะทำการวาง Script data transform กัน

Click open data transformer editor

ให้ทำการ +ADD INPUT แล้วก็เลือก Variable เป็น CloudPubSubMessage

แล้วก็มาสร้าง Output variable รอไว้โดยไปที่ +ADD OUTPUT ด้านซ้ายมือแล้วใส่ Name เป็นพอแล้วสำหรับเบื้องต้น

Create output variable

เริ่ม Transform ข้อมูลกันโดยไปที่ ADD TRANSFORMATION แล้วเราจะได้ TRANSFORMATION มาวางอยู่บน canvas ให้ลากเส้น Input ไปหา TRANSFORMATION และลากเส้น TRANSFORMATION ไปหา Output

Drag input to transfromation and connect it with output

คลิ๊กไปที่ TRANSFORMATION เราจะเจอ +ADD OPERATION ก็จะเห็น Operation ต่างๆมากมาย โดยถ้าจำได้ เราต้องการข้อมูลในส่วน errorStatus ซึ่งอยู่ใน Key data เพราะแต่อย่างที่เห็นว่ามีการเก็บ errorStatus เป็น string ขั้นตอนในการเขียน Operation ก็คือ Get property (data)-> Parse json -> Get property(errorStatus) -> To string

Get property object and input Key as data also do like this for Get property step for “errorStatus” too
Step of operations

หรือจะเขียนเป็น Script ก็ได้โดยใช้ภาษา Jsonnet script

local f = import "functions";
local inputs = {"CloudPubSubMessage": std.extVar("CloudPubSubMessage")};
{"erroroutput": std.toString(std.parseJson(inputs["CloudPubSubMessage"]["data"])["errorStatus"])}

ทำการสร้าง Condition Flow ว่าให้ส่งอีเมลเฉพาะเวลาที่ Error เท่านั้น

พอเราได้ข้อมูลที่ต้องการแล้วจากขั้นตอน Transformation ทีนี้เราก็มาสร้าง Condition flow กันโดยในทีนี้จะเรียกว่า Edge condition ก่อนที่เราจะสามารถสร้าง Condition ได้เราต้องนำ Send Email มาวางบน canvas ก่อน แล้วคลิ๊กไปที่เส้นที่โยงระหว่าง Transformer และ Email จะมีให้สร้าง Edge condition

we want to check that if Error is exist it should not be { }

อย่างที่บอกไว้ว่าถ้า Error นั้นไม่มีเราจะได้ค่า data.errorStatus เป็น { } (ปีกกาที่มีช่องว่าง1ช่อง)

ส่งอีเมลเพื่อส่งแจ้งเตือน

และแล้วก็มาถึง config สุดท้ายก็คือ Send Email ในพาร์ทนี้ก็ไม่มีอะไรเลยครับใส่ข้อมูล Recipient ที่เราต้องการ แต่ที่สำคัญคือ Body in plain text : ถ้าเราต้องการที่จะมีข้อมูล Error ส่งไปด้วยเราจะต้องใส่ Variable ในที่ตรงนี้

don’t forget Body in plain text

มาดูกันว่าเวลาที่ Error ถูกส่งมาเป็นยังไง

Email ที่มีการส่งมาจาก Application integration

ถ้าอยากปรับหน้าตาหรือรูปแบบการส่งอีเมลก็สามารถทำเพิ่มเติมได้เช่นกันก็สามารถเพิ่ม Transformer หรือ Mapping เอา และก็จบไปแล้วกับการทำ Notify ผ่าน Application integration.

Bonus! เราสามารถทำ format แบบนี้ก็ได้ผ่านตัว Application integration ในตัวอย่างเป็นการไป query count row

2. วิธีแจ้งเตือนผ่าน Cloud logging

มาที่วิธีการสร้าง Notification ผ่าน Cloud logging Alert กันบ้าง

จากข้างต้นที่เราที่ได้ทำการสร้าง Schedule query ไปแล้วสังเกตว่าถ้าตอนที่เรา query Assert นั้นเวลาที่มีข้อมูลผิดผลาดจะเกิด Error ใช่ไหมครับ นั้นแหละ Alert จะไปเกิดที่ Cloud logging เช่นกัน

เริ่มแรกให้เราไปที่ Cloud logging ไปหา log ที่เป็น Error ของเราโดยดูที่ ProtoPayload แล้วก็กดที่ status จะเห็น Label ที่เราสร้างไว้ให้กับ Assert ของเรา หลังจากก็คลิ๊ก Show matching entries

Cloud logging ที่บอก Error ของเรา

เราจะได้ Logging Query มา

resource.type="bigquery_resource"
severity=ERROR
protoPayload.status.message="gender must not exceed 1 character"

หลังจากนั้นให้ไปสร้าง Alert โดยไปที่ Actions แล้วคลิ๊ก Create log alert

Create log alert

หลังจากนั้นก็ตามขั้นตอนนี้

policy name ตามใจชอบครับ ส่วน Document ก็แล้วแต่ว่าจะใส่อะไรเช่น เวลาเจอ Error ต้องทำยังไงเป็นต้นเป็น
เป็น log query ที่เราได้มาจากข้างต้นและในส่วน label จะใส่หรือไม่ก็ได้
เลือกว่าจะให้ให้เตือนถี่มากน้อยแค่ไหน
เลือกว่าจะแจ้งเตือนยังไง ในที่นี้เราจะแจ้งผ่านอีเมลเมื่อคลิ๊ก drop down จะมีให้เลือกแต่ถ้าไม่มีให้กดสร้างผ่าน Manage notification ได้เลย

หลังจากลองเทสเมื่อเวลาเกิด Error นั้นก็จะได้อีเมลตามนี้

Alert email from Cloud logging

Conclusion

ก็จบไปแล้วสำหรับการทำ notification ในรูปแบบทั้งผ่าน Application integration และ Cloud logging เหมาะสำหรับทีมที่ยังเล็กๆและไม่มีความรู้ในการไปเรียกใช้งาน Open source หรือจะนำไปใช้ในรูปแบบอื่นๆ แต่อย่างน้อยก็รับรู้ถึงการใช้ notification ในรูปแบบต่างโดยไม่ต้องเขียน Code สักนิดเดียว (ยกเว้น SQL 🫣)

ถ้าเปรียบเทียบทั้งสองตัว Application integration และ Cloud logging alert นั้นทั้งสองสามารถใช้งานได้ทั้งคู่แต่ Cloud logging alert จะไม่สามารถปรับแต่งข้อความหรือแสเงผลอะไรได้มากนัก แต่ข้อดีเลยคือทำได้ไว แต่ถ้าเราไปใช้ Application integration เราสามารถปรับแต่งข้อมูลได้เยอะก่อนที่จะมีการส่งอีเมลไปหาผู้รับและยังสามารถทำงานอื่นๆได้ด้วย เช่นเป็นตัวส่งข้อมูลรายวันให้เรา โดยเอาข้อมูลจากการ Query เป็นต้น

Reference

https://cloud.google.com/application-integration/docs/overview


Follow me here! any question accept

Medium:

https://cloud.google.com/application-integration/docs/overview

FB page:

https://cloud.google.com/application-integration/docs/overview

linkedin: https://www.linkedin.com/in/kriangsak-sumthong/

Please subscribe!

Leave a comment