Part 1 ทำความรู้จักว่าทำงานยังไง ทำอะไรได้บ้าง ตัวอย่างการใช้งานผ่าน Job builder

The introduction of Cloud Dataflow
ในโลกที่หมุนด้วยข้อมูลต่างๆนั้น การรวบรวมข้อมูลก็เป็นเรื่องสำคัญไม่แพ้กัน แต่จะเอาข้อมูลมาใช้ได้ยังไง ในเมื่อข้อมูลเราอยู่ตามที่ต่างๆ ไม่ว่าจะเป็น csv, database (mysql, mongodb, sql server), pub/sub etc.., การทำ ETL (extract transform load) ELT(extract load transform) จึงเป็นส่วนสำคัญในลำดับต้นๆที่ทางธุรกิจจำเป็นต้องทำเพื่อที่จะนำข้อมูลไปใช้ได้

ด้านผู้บริการด้าน Cloud computing ที่ชื่อว่า Google cloud platform ก็มีบริการที่เรียกว่า Cloud dataflow ที่มี open source ชื่อว่า Apache Beam ในการทำงานอยู่เบื้องหลังนั้นเป็นเครื่องมือที่สามารถทำ etl elt โดยเป็นได้ในรูปแบบ batch หรือ realtime ก็ได้ขึ้นอยู๋กับงานที่เราต้องการจะทำ

Part 1 จะเป็นการแนะนำความสามารถเบื้องต้นและตัวอย่างการทำ ETL ELT ในรูปแบบทั้ง realtime และ batch
ทำไมต้อง Cloud Dataflow
– Managed : กล่าวคือทาง GCP จัดการ resouce ให้หมดทุกอย่างเช่น สร้าง vm ขึ้นมา โดยเรามีหน้าทำแค่สร้าง template และเลือก setting ต่างๆเท่านั้น
– Scalable : Dataflow นั้นเราสามารถเลือกให้ทำงานแบบ autoscale ได้
– Portable : โดยหลังบ้านของ Daflow นั้นทำงานด้วย Apache Beam SDKs ซึ่ง Beam นี้ไม่ได้จำเป็นจะต้องเขียนเพื่อนำมาใช้งานที่ Dataflow อย่างเดียวสามารถนำไปใช้งานที่อื่นได้ด้วยเพียงแก้ไขเล็กน้อย หรือ จะเป็นการนำจากที่อื่นเช่น Apache Flink
– Flexible : สามารถใช้งานได้ทั้งรูปแบบเขียนโค้ดเอง หรือไปเลือกจาก template ที่ทาง GCP มีไว้ให้
– Observable : ทาง GCP จัดการเรื่อง monitor ให้กับเราเรียบร้อยโดยเราสามารถเข้าไปดูการทำงานของ Dataflow ได้ตลอดเวลาและเช็คได้ว่า error หรือ failed เกิดจากอะไร
– Cost effective : ราคาการใช้งานของ Dataflow นั้นราคาเท่ากับการเปิด VM ใช้เลยกล่าวคือ ใช้เท่าไหร่จ่ายเท่านั้น
เจาะลึกการใช้งาน Dataflow
โดย Dataflow จะมีรูปแบบการเลือกใช้งานได้ 4 รูปแบบ
1. เลือกใช้จาก Job template ที่ทาง GCP สร้างไว้ให้
2. สร้างจาก Job builder (เพิ่งเปิดให้ใช้งานตอนปี 2024 ก่อนหน้านี้ไม่มีให้เลือกเศร้ามาก 😢 และ source sink ยังน้อยมีให้เลือกแค่ Pub/Sub GCS และ Bigquery)
3. เขียนโค้ดขึ้นมาเองผ่าน Apache Beam SDKs แล้วนำไปสร้างเป็น Custom template
4. เขียนผ่าน jupyter notebook โดยเขียนผ่าน Apache Beam SDKs
การเลือกใช้ว่าจะใช้รูปแบบไหนขึ้นอยู่กับขั้นตอนของเรามีความยุ่งยากไหม
No complex extract load job = Job template
Some transform but still not so complex ETL job = Job builder
Complex transform ETL job และ souce sink ไม่มีให้เลือกจากตัวเลือกก่อนหน้า = custom template with Apche Beam or Jupyter Notebook
Job template → Job builder → Custom Job with either Beam or Notebook
Job template
การเลือกใช้จาก Job template ที่ทาง GCP สร้างไว้ให้ก่อนเพียงเราแค่ใส่ค่า parameter จำพวก source และ sink เท่านั้นและอื่นๆอีกเล็กน้อย มาดูกันว่ามี Dataflow template อะไรบ้าง โดยแบ่งออกเป็น 2 แบบ Streaming และ Batch ตัวอย่าง template เช่น
Streaming (Realtime) template:
- Pub/Sub to BigQuery
- Pub/Sub Subscription to BigQuery
- Pub/Sub to Avro Files on Cloud Storage
- Apache Kafka to Apache Kafka
- Apache Kafka to BigQuery
- Apache Kafka to Cloud Storage
- Change Data Capture from MySQL to BigQuery (Stream)
- Bigtable change streams to BigQuery
Batch template:
- BigQuery export to Parquet (via Storage API)
- BigQuery to Cloud Storage TFRecords
- BigQuery to Elasticsearch
- Oracle to BigQuery
- PostgreSQL to BigQuery
- SQL Server to BigQuery
- Cloud Storage CSV files to BigQuery
- Cloud Storage Text to BigQuery

Job builder
Job builder คือการสร้าง pipeline ผ่านหน้า console โดยระหว่างที่เรากำลังเพิ่ม parameter ต่างๆ ทางด้านขวาจะมีหน้าต่าง visualize flow มาให้ดูด้วยว่า flow ของ pipeline หน้าตาประมาณไหนซึ่งตอนนี้ Source และ Sink ยังมีแค่ Pub/Sub, Cloud storage และ Bigqeury เท่านั้นเอง แต่เราจะมีตัวอย่างการใช้งานซึ่งอ่านจบทำเป็นแน่นอน👌✌️

หลักๆแล้วหน้าต่างของ job builder จะมีสามส่วนด้วยกัน
1. Source แหล่งที่มาของข้อมูล (จำเป็น) ประกอบไปด้วย
– Pub/Sub Topic
– Pub/Sub Subscription
– BigQuery Table
– CSV from Cloud Storage
– JSON from Cloud Storage
– Text files from Cloud Storage
2. Transform ดัดแปลงแก้ไขข้อมูล (ไม่ต้องมีก็ได้) ประกอบไปด้วยทั้งหมด 6 types
– Filter (Python) เป็นการเขียนคัดกรองข้อมูลเช่น age >= 20
– Join เป็นการเพิ่มขั้นตอนเพื่อ Join ข้อมูล เช่น จาก source 1 กับ source 2
– Map fields (Python) เป็นการที่เราต้องการแก้ไขดัดแปลงข้อมูลโดยใช้ python
โดยจะมี 2 วิธีให้เลือก แบบใช้ callable กับไม่ใช้ การใช้ callable คือการที่เราต้องแก้ไขข้อมูลแบบ complex มากขึ้นโดยเป็นการเขียน python function เช่นเรามี column ชื่อว่า thai_year เราต้องการเปลี่ยนให้เป็น ค.ศ. ส่วนถ้าจะไม่ใช้ callable ก็เพียงเขียน python ง่ายๆให้นึกถึงเวลาเราจะ ใส่ค่าอะไรไปในตัวแปร เช่น ad_year = thai_year — 543 เราก็ใส่ในช่องแค่ thai_year — 543 ดังตัวอย่าง ⬇️


– Map fields (SQL) เป็นการที่เราต้องการแก้ไขดัดแปลงข้อมูลโดยใช้ภาษา SQL หลักการเขียนง่ายๆเลยให้นึกถึง เวลาเราเขียน sql ก็มี select from ถูกไหมแต่ในที่นี้เราเอามาแค่ select พอ เช่น SELECT thai_year — 543 AS ad_year FROM some_table เราก็เอามาแค่ thai_year — 543 ⬇️

– Group by เป็นการ aggregate ข้อมูลเช่นทำการ sum, averge, max min etc…
– Explode เป็นการแตก value ที่เป็น array ให้เป็น row ใหม่
3. Sink จุดหมายปลายทางของข้อมูล (จำเป็น)
– Pub/Sub Topic
– BigQuery Table
– CSV onCloud Storage
– JSON onCloud Storage
– Text files on Cloud Storage
Batch job
มาเริ่มกันที่ batch job ก่อน จะเป็นการทำงานทำแล้วจบไป และใช่ครับทุกครั้งก่อนทำอะไร ต้องมี flow แผนการเสมอ No plan no success! 🔥

อย่างแรกก็ต้องมี data ถูกไหมครับ แนะนำไปที่ Mockaroo เพื่อ mock data เราขึ้นมา

หลังจากนั้นก็เอา csv ที่เราสร้างไว้ไปเก็บที่ Google cloud storage เพื่อที่จะนำไปใช้ต่อ แล้วก็กลับไปที่หน้า Job builder เพื่อเลือก source ที่เราเก็บเอาไว้

ขั้นตอนถัดมาเป็นการสร้าง transform โดยเราจะทำการ filter age > 20 และเปลี่ยนข้อมูลเพศให้เป็นตัวย่อจาก Male เป็น M เป็นต้น

หลังจากนั้นทำการเลือก sink หรือจุดหมายปลายทางของ pipeline อย่างในนี้จะเป็น Bigquery table

และในส่วนของ Optional Parameter เราสามารถปรับได้ตามชอบใจอย่างเช่น Machine type, network vpc, min max worker เป็นต้น เสร็จแล้ว save เพื่อกลับนำไปใช้งานเป็น template ครั้งต่อไปได้

หลังจากกด RUN JOB ทาง Dataflow จะนำเราไปที่หน้า job monitoring ประกอบไปด้วย Job graph, job metrics, cost และ log ต่างๆ แล้วในระหว่างที่ job run นั้นจะมีกราฟให้ดูเลยว่ากำลัง process อยู่ในขั้นตอนและ status ต่างๆ

พอ pipeline เราทำงานเสร็จแล้วก็ไปดูที่ sink ของเราในที่นี้คือ ซึ่งในที่นี้เราได้ทำการ filter age > 20 และเปลี่ยน gender จากตัวเต็มเป็นแบบย่อ จบเรียบร้อยสำหรับ Batch pipeline

Streaming job
ไปกันต่อ🔥 ครั้งนี้จะเป็นการทำ realtime pipeline โดยจะมี Flow ดังนี้⬇️

หน้าตาของ data เราที่ mock มาในตอนนี้จะเป็นหน้าตาแบบนี้

เริ่มต้นเรามาสร้าง Pub/Sub topic subscribe ให้เรียบร้อย เป็น messaging service จะเป็นที่ๆเราจะมาทำ streaming ingestion

พอเรามี Pub/Sub แล้วก็มาสร้าง Dataflow pipeline กันโดยเลือกให้เป็นรูปแบบ streaming เพราะเราจะมาทำกันในรูปแบบ realtime โดยเราจำเป็นต้องเลือก windowing ด้วย ในที่นี้ผมจะเลือก fixed window ที่ 60 วิ
ถ้าใครไม่รู้จัก windowing คืออะไรแนะนำให้ดูที่ โพสนี้ของผมเอง https://www.facebook.com/share/p/9rRx45csBS7iXvWY/

เลือก source ถ้าเราอยากทำ etl ที่มีการ transform แนะนำให้เลือก json format ถ้า messege ของเรารับมาในรูปแบบ json แล้วก็ใส่ field(column, key) ให้ครบ แต่ทั้งนี้ขึ้นอยู่กับ data ที่เราทำอีกที

ในขั้นตอน transform ก็จะมี filter age > 20 เหมือนกันกับ Batch แต่เราจะมาเพิ่มในส่วนของ group by โดยจะเลือก field หลักก็คือ id และที่ต้องการ sum ก็คือ payment_usd เพราะเราอยากทราบว่าทุก 1 นาทีแต่ละ id จ่ายไปเท่าไหร่ ถ้ามองเป็น SQL = SELECT id, SUM(payment_usd) FROM table GROUP BY id

ในด้านของ sink ก็เหมือนเดิมเราเลือกที่จะไปวางไว้ที่ Bigquery ตาม flow ⬇️ แล้วมาเริ่ม RUN JOB กัน

พอ Status ขึ้นว่า Running แล้วก็แสดงว่าตอนนี้ Dataflow เริ่มทำงานแล้วมาเริ่มส่งข้อมูลไปที่ Pub/Sub ได้เลย โดยใช้ภาษาอะไรก็ได้ในการส่งข้อมูลไป Pub/Sub

แต่ของผู้เขียนจะเป็นการเขียนpython script เพื่อส่งข้อมูลไปยัง Pub/Sub ดูวิธีเขียนตาม Document ของ Pub/Sub ตามนี้ https://cloud.google.com/python/docs/reference/pubsub/latest#example-usage

หลังจากที่เราทำการส่งข้อมูลไปแล้วกันไปดูที่ Bigquery
เย้..เสร็จแล้ว streaming pipeline 🔥✌️

Conclusion
ก็จบไปแล้วการทำ Dataflow pipeline ในรูปแบบ ETL ที่มีทั้ง Batch และ Streaming ผ่านการทำ Data pipeline ที่หน้า console ของ Dataflow
ตัวDataflow นั้นค่อนข้างจะใช้งานได้ง่ายและทำความเข้าได้ไม่ยากหนักหากขบวนการ ETL ELT เรานั้นไม่ได้มีความยุ่งยากมากนักและโดยเฉพาะถ้าต้นทางปลายทางของเรานั้นอยู่บน GCP อยู่แล้วจะทำให้การทำงานได้สะดวกขึ้นมาก😎
Bonus! schedule a batch pipeline
ถ้าเราได้สร้าง pipeline ไว้แล้วมันสามารถทำงานได้ตามที่เราต้องการ เราก็อยากจะสร้าง schedule เพื่อให้ batch ของเราทำงานทุกๆช่วงเวลาที่เราต้องการถูกไหมครับ เช่น ทุกๆ 1 ชั่วโมง หรือ ทุกๆวันเวลา 00.00 เป็นต้น วิธีการก็คือ
- ไปที่แท็บ Pipelines แล้ว Create data pipeline
- ตั้งชื่อ Pipeline ของเราในช่อง Pipeline name แล้วก็เลือก template ถ้า template ที่เราใช้เป็นแบบ Google manage template ให้เลือกที่เราอยากใช้ได้เลย แต่ถ้าเรา save pipeline ของเราจากการสร้างจาก Job builder ให้เราไปเลือกที่ YAML แล้วก็ที่ optional parameter ให้เลือก file ที่เรา save เอาไว้ส่วน Schedule ก็เลือกตามต้องการเลย Hour, daily แต่อยากกำนหดเองให้เลือก custom แล้วใส่ค่าเป็น unix-cron format


กด Create pipeline เป็นอันเรียบร้อยเดี๋ยว pipeline ของเราก็จะทำงานเองตามกำหนดเวลาที่เราตั้งเอาไว้ 😎

ข้อคิดเห็นส่วนตัว
มีอย่างหนึ่งที่อยากแนะนำอย่างหนึ่งคือถ้าคุณจำเป็นจะต้อง ทำงานแบบ mini batch เช่น 5 mins จะไม่ค่อยเหมาะเท่าไหร่สำหรับการใช้งาน Dataflow เพราะการทำงานของ Dataflow นั้นจำเป็นต้องสร้าง vm ขึ้นมาแล้วค่อยโหลด dependencies ต่างๆ ทำให้การทำงาน Batch หนึ่งครั้งในรูปแบบงานง่ายๆตั้งแต่ต้นจนจบอาจจะใช้เวลาไปทั้งสิ้น 5–10 นาทีได้ งานเล็กไวๆแนะนำว่ามองไปที่ Cloud run function อาจจะเหมาะกว่าก็ได้
What’s next!
Part 2 จะเป็นการพูดถึง Apache Beam ที่เราจะมีการเขียนเพื่อนำมาสร้าง Pipeline มาอ่านกันว่า Apache Beam คืออะไร ทำงานยังไง🎉🎉🎉
Source:
https://cloud.google.com/dataflow/docs/overview
https://beam.apache.org/documentation/sdks/yaml/
Follow me here!
Medium: https://medium.com/@puk.kriangsak
linkedin: https://www.linkedin.com/in/kriangsak-sumthong/
FB page: https://www.facebook.com/profile.php?id=61563097228247
Leave a comment