Problem 2 Implement a relational join as a MapReduce query Consider the query:
S E LECT * FROM Orders, LineItem WHERE Order.order_id = LineItem.order_id
Your MapReduce query should produce the same information as this SQL query. You can consider the two input tables, Order and LineItem, as one big concatenated bag of records which gets fed into the map function record by record.
Map Input
The input will be database records formatted as lists of Strings.
Every list element corresponds to a different field in it’s corresponding record.
The first item(index 0) in each record is a string that identifies which table the record originates from. This field has two possible values:
‘line_item’ indicates that the record is a line item. ‘order’ indicates that the record is an order.
The second element(index 1) in each record is the order_id.
LineItem records have 17 elements including the identifier string.
Order records have 10 elements including the identifier string. Reduce Output
The output should be a joined record.
The result should be a single list of length 27 that contains the fields from the order record followed by the fields from the line item record. Each list element should be a string.
You can test your solution to this problem using records.json:
python join.py records.json
You can verify your solution against join.json.
import MapReduce import sys """ Word Count Example in the Simple Python MapReduce Framework """ mr = MapReduce.MapReduce() # ============================= # Do not modify above this line TABLE_1_NAME = 'order' TABLE_2_NAME = 'line_item' def mapper(record): # key: document identifier # value: document contents # key = record[0] # value = record[1] # words = value.split() # for w in words: # mr.emit_intermediate(w, key) table_name = record[0] order_id = record[1] table_fields = record[2:] mr.emit_intermediate(order_id,[table_name,table_fields] ) def reducer(key, list_of_values): # key: word # value: list of occurrence counts # result = [] # total = 0 # for v in list_of_values: # total += v # mr.emit((key, total)) # # for ID in list_of_values: # if ID not in result: # result.append(ID) # mr.emit( (key,result) ) table1 = [table[1] for table in list_of_values if table[0] == TABLE_1_NAME ] table2 = [table[1] for table in list_of_values if table[0] == TABLE_2_NAME ] for record1 in table1: for record2 in table2: res = [] res.append(TABLE_1_NAME) res.append(key) res.extend(record1) res.append(TABLE_2_NAME) res.append(key) res.extend(record2) mr.emit(res) # Do not modify below this line # ============================= if __name__ == '__main__': inputdata = o p e n(sys.argv[1]) mr.e x e c u t e(inputdata, mapper, reducer)
Leave a Comment