Problem 2 Implement a relational join as a MapReduce query Consider the query:

S E LECT * FROM Orders, LineItem WHERE Order.order_id = LineItem.order_id

Your MapReduce query should produce the same information as this SQL query. You can consider the two input tables, Order and LineItem, as one big concatenated bag of records which gets fed into the map function record by record.

Map Input

The input will be database records formatted as lists of Strings.

Every list element corresponds to a different field in it’s corresponding record.

The first item(index 0) in each record is a string that identifies which table the record originates from. This field has two possible values:

    ‘line_item’ indicates that the record is a line item.

    ‘order’ indicates that the record is an order.

The second element(index 1) in each record is the order_id.

LineItem records have 17 elements including the identifier string.

Order records have 10 elements including the identifier string. Reduce Output

The output should be a joined record.

The result should be a single list of length 27 that contains the fields from the order record followed by the fields from the line item record. Each list element should be a string.

You can test your solution to this problem using records.json:

    python join.py records.json

You can verify your solution against join.json.

import MapReduce
import sys
 
"""
Word Count Example in the Simple Python MapReduce Framework
"""
 
mr = MapReduce.MapReduce()
 
# =============================
# Do not modify above this line
TABLE_1_NAME = 'order'
TABLE_2_NAME = 'line_item'
 
def mapper(record):
    # key: document identifier
    # value: document contents
#    key = record[0]
#    value = record[1]
#    words = value.split()
#    for w in words:
#      mr.emit_intermediate(w, key)
    table_name = record[0]
    order_id = record[1]
    table_fields = record[2:]
 
    mr.emit_intermediate(order_id,[table_name,table_fields] )
 
def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts
#    result = []
#    total = 0
#    for v in list_of_values:
#      total += v
#    mr.emit((key, total))
#
#    for ID in list_of_values:
#        if ID not in result:
#            result.append(ID)
#    mr.emit( (key,result) )
    table1 = [table[1] for table in list_of_values if table[0] == TABLE_1_NAME ]
    table2 = [table[1] for table in list_of_values if table[0] == TABLE_2_NAME ]
 
    for record1 in table1:
        for record2 in table2:
            res = []
 
            res.append(TABLE_1_NAME)
            res.append(key)
            res.extend(record1)
            res.append(TABLE_2_NAME)
            res.append(key)
            res.extend(record2)
 
            mr.emit(res)
 
 
 
# Do not modify below this line
# =============================
if __name__ == '__main__':
  inputdata = o p e n(sys.argv[1])
  mr.e x e c u t e(inputdata, mapper, reducer)

Leave a Comment

Fields with * are required.

Please enter the letters as they are shown in the image above.
Letters are not case-sensitive.