,

GCP Cloud Dataflow | Part 1 | The introduction

Part 1 ทำความรู้จักว่าทำงานยังไง ทำอะไรได้บ้าง ตัวอย่างการใช้งานผ่าน Job builder

The introduction of Cloud Dataflow

ในโลกที่หมุนด้วยข้อมูลต่างๆนั้น การรวบรวมข้อมูลก็เป็นเรื่องสำคัญไม่แพ้กัน แต่จะเอาข้อมูลมาใช้ได้ยังไง ในเมื่อข้อมูลเราอยู่ตามที่ต่างๆ ไม่ว่าจะเป็น csv, database (mysql, mongodb, sql server), pub/sub etc.., การทำ ETL (extract transform load) ELT(extract load transform) จึงเป็นส่วนสำคัญในลำดับต้นๆที่ทางธุรกิจจำเป็นต้องทำเพื่อที่จะนำข้อมูลไปใช้ได้

ELT and ETL

ด้านผู้บริการด้าน Cloud computing ที่ชื่อว่า Google cloud platform ก็มีบริการที่เรียกว่า Cloud dataflow ที่มี open source ชื่อว่า Apache Beam ในการทำงานอยู่เบื้องหลังนั้นเป็นเครื่องมือที่สามารถทำ etl elt โดยเป็นได้ในรูปแบบ batch หรือ realtime ก็ได้ขึ้นอยู๋กับงานที่เราต้องการจะทำ

ตัวอย่างการทำ ETL ผ่าน Dataflow pipeline

Part 1 จะเป็นการแนะนำความสามารถเบื้องต้นและตัวอย่างการทำ ETL ELT ในรูปแบบทั้ง realtime และ batch

ทำไมต้อง Cloud Dataflow
Managed : กล่าวคือทาง GCP จัดการ resouce ให้หมดทุกอย่างเช่น สร้าง vm ขึ้นมา โดยเรามีหน้าทำแค่สร้าง template และเลือก setting ต่างๆเท่านั้น
Scalable : Dataflow นั้นเราสามารถเลือกให้ทำงานแบบ autoscale ได้
Portable : โดยหลังบ้านของ Daflow นั้นทำงานด้วย Apache Beam SDKs ซึ่ง Beam นี้ไม่ได้จำเป็นจะต้องเขียนเพื่อนำมาใช้งานที่ Dataflow อย่างเดียวสามารถนำไปใช้งานที่อื่นได้ด้วยเพียงแก้ไขเล็กน้อย หรือ จะเป็นการนำจากที่อื่นเช่น Apache Flink 
Flexible : สามารถใช้งานได้ทั้งรูปแบบเขียนโค้ดเอง หรือไปเลือกจาก template ที่ทาง GCP มีไว้ให้
Observable : ทาง GCP จัดการเรื่อง monitor ให้กับเราเรียบร้อยโดยเราสามารถเข้าไปดูการทำงานของ Dataflow ได้ตลอดเวลาและเช็คได้ว่า error หรือ failed เกิดจากอะไร
Cost effective : ราคาการใช้งานของ Dataflow นั้นราคาเท่ากับการเปิด VM ใช้เลยกล่าวคือ ใช้เท่าไหร่จ่ายเท่านั้น


เจาะลึกการใช้งาน Dataflow

โดย Dataflow จะมีรูปแบบการเลือกใช้งานได้ 4 รูปแบบ 
1. เลือกใช้จาก Job template ที่ทาง GCP สร้างไว้ให้
2. สร้างจาก Job builder (เพิ่งเปิดให้ใช้งานตอนปี 2024 ก่อนหน้านี้ไม่มีให้เลือกเศร้ามาก 😢 และ source sink ยังน้อยมีให้เลือกแค่ Pub/Sub GCS และ Bigquery)
3. เขียนโค้ดขึ้นมาเองผ่าน Apache Beam SDKs แล้วนำไปสร้างเป็น Custom template
4. เขียนผ่าน jupyter notebook โดยเขียนผ่าน Apache Beam SDKs

การเลือกใช้ว่าจะใช้รูปแบบไหนขึ้นอยู่กับขั้นตอนของเรามีความยุ่งยากไหม
No complex extract load job = Job template
Some transform but still not so complex ETL job = Job builder
Complex transform ETL job และ souce sink ไม่มีให้เลือกจากตัวเลือกก่อนหน้า = custom template with Apche Beam or Jupyter Notebook
Job template → Job builder → Custom Job with either Beam or Notebook


Job template

การเลือกใช้จาก Job template ที่ทาง GCP สร้างไว้ให้ก่อนเพียงเราแค่ใส่ค่า parameter จำพวก source และ sink เท่านั้นและอื่นๆอีกเล็กน้อย มาดูกันว่ามี Dataflow template อะไรบ้าง โดยแบ่งออกเป็น 2 แบบ Streaming และ Batch ตัวอย่าง template เช่น

Streaming (Realtime) template:

Batch template:

ตัวอย่างหน้า console ของ Dataflow ผ่าน Dataflow templates

Job builder

Job builder คือการสร้าง pipeline ผ่านหน้า console โดยระหว่างที่เรากำลังเพิ่ม parameter ต่างๆ ทางด้านขวาจะมีหน้าต่าง visualize flow มาให้ดูด้วยว่า flow ของ pipeline หน้าตาประมาณไหนซึ่งตอนนี้ Source และ Sink ยังมีแค่ Pub/Sub, Cloud storage และ Bigqeury เท่านั้นเอง แต่เราจะมีตัวอย่างการใช้งานซึ่งอ่านจบทำเป็นแน่นอน👌✌️

Dataflow Job builder

หลักๆแล้วหน้าต่างของ job builder จะมีสามส่วนด้วยกัน 
1. Source แหล่งที่มาของข้อมูล (จำเป็น) ประกอบไปด้วย
– Pub/Sub Topic
– Pub/Sub Subscription
– BigQuery Table
– CSV from Cloud Storage
– JSON from Cloud Storage
– Text files from Cloud Storage

2. Transform ดัดแปลงแก้ไขข้อมูล (ไม่ต้องมีก็ได้) ประกอบไปด้วยทั้งหมด 6 types
– Filter (Python) เป็นการเขียนคัดกรองข้อมูลเช่น age >= 20
– Join เป็นการเพิ่มขั้นตอนเพื่อ Join ข้อมูล เช่น จาก source 1 กับ source 2
– Map fields (Python) เป็นการที่เราต้องการแก้ไขดัดแปลงข้อมูลโดยใช้ python
โดยจะมี 2 วิธีให้เลือก แบบใช้ callable กับไม่ใช้ การใช้ callable คือการที่เราต้องแก้ไขข้อมูลแบบ complex มากขึ้นโดยเป็นการเขียน python function เช่นเรามี column ชื่อว่า thai_year เราต้องการเปลี่ยนให้เป็น ค.ศ. ส่วนถ้าจะไม่ใช้ callable ก็เพียงเขียน python ง่ายๆให้นึกถึงเวลาเราจะ ใส่ค่าอะไรไปในตัวแปร เช่น ad_year = thai_year — 543 เราก็ใส่ในช่องแค่ thai_year — 543 ดังตัวอย่าง ⬇️

Callable python
Non callable python

– Map fields (SQL) เป็นการที่เราต้องการแก้ไขดัดแปลงข้อมูลโดยใช้ภาษา SQL หลักการเขียนง่ายๆเลยให้นึกถึง เวลาเราเขียน sql ก็มี select from ถูกไหมแต่ในที่นี้เราเอามาแค่ select พอ เช่น SELECT thai_year — 543 AS ad_year FROM some_table เราก็เอามาแค่ thai_year — 543 ⬇️

SQL map field

– Group by เป็นการ aggregate ข้อมูลเช่นทำการ sum, averge, max min etc…
– Explode เป็นการแตก value ที่เป็น array ให้เป็น row ใหม่

3. Sink จุดหมายปลายทางของข้อมูล (จำเป็น)
– Pub/Sub Topic
– BigQuery Table
– CSV onCloud Storage
– JSON onCloud Storage
– Text files on Cloud Storage

Batch job
มาเริ่มกันที่ batch job ก่อน จะเป็นการทำงานทำแล้วจบไป และใช่ครับทุกครั้งก่อนทำอะไร ต้องมี flow แผนการเสมอ No plan no success! 🔥

flow ตัวอย่าง

อย่างแรกก็ต้องมี data ถูกไหมครับ แนะนำไปที่ Mockaroo เพื่อ mock data เราขึ้นมา

Mock data from Mockaroo.com

หลังจากนั้นก็เอา csv ที่เราสร้างไว้ไปเก็บที่ Google cloud storage เพื่อที่จะนำไปใช้ต่อ แล้วก็กลับไปที่หน้า Job builder เพื่อเลือก source ที่เราเก็บเอาไว้

ขั้นตอนถัดมาเป็นการสร้าง transform โดยเราจะทำการ filter age > 20 และเปลี่ยนข้อมูลเพศให้เป็นตัวย่อจาก Male เป็น M เป็นต้น

หลังจากนั้นทำการเลือก sink หรือจุดหมายปลายทางของ pipeline อย่างในนี้จะเป็น Bigquery table

และในส่วนของ Optional Parameter เราสามารถปรับได้ตามชอบใจอย่างเช่น Machine type, network vpc, min max worker เป็นต้น เสร็จแล้ว save เพื่อกลับนำไปใช้งานเป็น template ครั้งต่อไปได้

หลังจากกด RUN JOB ทาง Dataflow จะนำเราไปที่หน้า job monitoring ประกอบไปด้วย Job graph, job metrics, cost และ log ต่างๆ แล้วในระหว่างที่ job run นั้นจะมีกราฟให้ดูเลยว่ากำลัง process อยู่ในขั้นตอนและ status ต่างๆ

succeed job

พอ pipeline เราทำงานเสร็จแล้วก็ไปดูที่ sink ของเราในที่นี้คือ ซึ่งในที่นี้เราได้ทำการ filter age > 20 และเปลี่ยน gender จากตัวเต็มเป็นแบบย่อ จบเรียบร้อยสำหรับ Batch pipeline

Job complete✌️

Streaming job

ไปกันต่อ🔥 ครั้งนี้จะเป็นการทำ realtime pipeline โดยจะมี Flow ดังนี้⬇️

อยากรู้ยอดขายทุกๆ 1 นาทีได้กี่บาท

หน้าตาของ data เราที่ mock มาในตอนนี้จะเป็นหน้าตาแบบนี้

mock data

เริ่มต้นเรามาสร้าง Pub/Sub topic subscribe ให้เรียบร้อย เป็น messaging service จะเป็นที่ๆเราจะมาทำ streaming ingestion

Google Pub/Sub a messeging service

พอเรามี Pub/Sub แล้วก็มาสร้าง Dataflow pipeline กันโดยเลือกให้เป็นรูปแบบ streaming เพราะเราจะมาทำกันในรูปแบบ realtime โดยเราจำเป็นต้องเลือก windowing ด้วย ในที่นี้ผมจะเลือก fixed window ที่ 60 วิ

ถ้าใครไม่รู้จัก windowing คืออะไรแนะนำให้ดูที่ โพสนี้ของผมเอง https://www.facebook.com/share/p/9rRx45csBS7iXvWY/

เลือก source ถ้าเราอยากทำ etl ที่มีการ transform แนะนำให้เลือก json format ถ้า messege ของเรารับมาในรูปแบบ json แล้วก็ใส่ field(column, key) ให้ครบ แต่ทั้งนี้ขึ้นอยู่กับ data ที่เราทำอีกที

ในขั้นตอน transform ก็จะมี filter age > 20 เหมือนกันกับ Batch แต่เราจะมาเพิ่มในส่วนของ group by โดยจะเลือก field หลักก็คือ id และที่ต้องการ sum ก็คือ payment_usd เพราะเราอยากทราบว่าทุก 1 นาทีแต่ละ id จ่ายไปเท่าไหร่ ถ้ามองเป็น SQL = SELECT id, SUM(payment_usd) FROM table GROUP BY id

ในด้านของ sink ก็เหมือนเดิมเราเลือกที่จะไปวางไว้ที่ Bigquery ตาม flow ⬇️ แล้วมาเริ่ม RUN JOB กัน

พอ Status ขึ้นว่า Running แล้วก็แสดงว่าตอนนี้ Dataflow เริ่มทำงานแล้วมาเริ่มส่งข้อมูลไปที่ Pub/Sub ได้เลย โดยใช้ภาษาอะไรก็ได้ในการส่งข้อมูลไป Pub/Sub

Dataflow running status

แต่ของผู้เขียนจะเป็นการเขียนpython script เพื่อส่งข้อมูลไปยัง Pub/Sub ดูวิธีเขียนตาม Document ของ Pub/Sub ตามนี้ https://cloud.google.com/python/docs/reference/pubsub/latest#example-usage

python script ส่งข้อมูลจาก csv → Pub/Sub

หลังจากที่เราทำการส่งข้อมูลไปแล้วกันไปดูที่ Bigquery 
เย้..เสร็จแล้ว streaming pipeline 🔥✌️


Conclusion

ก็จบไปแล้วการทำ Dataflow pipeline ในรูปแบบ ETL ที่มีทั้ง Batch และ Streaming ผ่านการทำ Data pipeline ที่หน้า console ของ Dataflow 
ตัวDataflow นั้นค่อนข้างจะใช้งานได้ง่ายและทำความเข้าได้ไม่ยากหนักหากขบวนการ ETL ELT เรานั้นไม่ได้มีความยุ่งยากมากนักและโดยเฉพาะถ้าต้นทางปลายทางของเรานั้นอยู่บน GCP อยู่แล้วจะทำให้การทำงานได้สะดวกขึ้นมาก😎

Bonus! schedule a batch pipeline

ถ้าเราได้สร้าง pipeline ไว้แล้วมันสามารถทำงานได้ตามที่เราต้องการ เราก็อยากจะสร้าง schedule เพื่อให้ batch ของเราทำงานทุกๆช่วงเวลาที่เราต้องการถูกไหมครับ เช่น ทุกๆ 1 ชั่วโมง หรือ ทุกๆวันเวลา 00.00 เป็นต้น วิธีการก็คือ

  • ไปที่แท็บ Pipelines แล้ว Create data pipeline
  • ตั้งชื่อ Pipeline ของเราในช่อง Pipeline name แล้วก็เลือก template ถ้า template ที่เราใช้เป็นแบบ Google manage template ให้เลือกที่เราอยากใช้ได้เลย แต่ถ้าเรา save pipeline ของเราจากการสร้างจาก Job builder ให้เราไปเลือกที่ YAML แล้วก็ที่ optional parameter ให้เลือก file ที่เรา save เอาไว้ส่วน Schedule ก็เลือกตามต้องการเลย Hour, daily แต่อยากกำนหดเองให้เลือก custom แล้วใส่ค่าเป็น unix-cron format

กด Create pipeline เป็นอันเรียบร้อยเดี๋ยว pipeline ของเราก็จะทำงานเองตามกำหนดเวลาที่เราตั้งเอาไว้ 😎

pipeline แบบ cool cool

ข้อคิดเห็นส่วนตัว

มีอย่างหนึ่งที่อยากแนะนำอย่างหนึ่งคือถ้าคุณจำเป็นจะต้อง ทำงานแบบ mini batch เช่น 5 mins จะไม่ค่อยเหมาะเท่าไหร่สำหรับการใช้งาน Dataflow เพราะการทำงานของ Dataflow นั้นจำเป็นต้องสร้าง vm ขึ้นมาแล้วค่อยโหลด dependencies ต่างๆ ทำให้การทำงาน Batch หนึ่งครั้งในรูปแบบงานง่ายๆตั้งแต่ต้นจนจบอาจจะใช้เวลาไปทั้งสิ้น 5–10 นาทีได้ งานเล็กไวๆแนะนำว่ามองไปที่ Cloud run function อาจจะเหมาะกว่าก็ได้

What’s next!

Part 2 จะเป็นการพูดถึง Apache Beam ที่เราจะมีการเขียนเพื่อนำมาสร้าง Pipeline มาอ่านกันว่า Apache Beam คืออะไร ทำงานยังไง🎉🎉🎉

Source:

https://cloud.google.com/dataflow/docs/overview
https://beam.apache.org/documentation/sdks/yaml/


Follow me here!

Medium: https://medium.com/@puk.kriangsak
linkedin: https://www.linkedin.com/in/kriangsak-sumthong/
FB page: https://www.facebook.com/profile.php?id=61563097228247

Please subscribe!

Leave a comment